Show More
@@ -271,22 +271,22 b' class LocalEngineSet(object):' | |||||
271 | dfinal.addCallback(self._handle_stop) |
|
271 | dfinal.addCallback(self._handle_stop) | |
272 | return dfinal |
|
272 | return dfinal | |
273 |
|
273 | |||
274 |
|
||||
275 | class BatchEngineSet(object): |
|
274 | class BatchEngineSet(object): | |
276 |
|
275 | |||
277 | # Subclasses must fill these in. See PBSEngineSet |
|
276 | # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet | |
|
277 | name = '' | |||
278 | submit_command = '' |
|
278 | submit_command = '' | |
279 | delete_command = '' |
|
279 | delete_command = '' | |
|
280 | script_param_prefix = '' | |||
280 | job_id_regexp = '' |
|
281 | job_id_regexp = '' | |
|
282 | job_array_regexp = '' | |||
|
283 | default_template = '' | |||
281 |
|
284 | |||
282 | def __init__(self, template_file, **kwargs): |
|
285 | def __init__(self, template_file, **kwargs): | |
283 | self.template_file = template_file |
|
286 | self.template_file = template_file | |
284 | self.context = {} |
|
|||
285 | self.context.update(kwargs) |
|
|||
286 | self.batch_file = self.template_file+'-run' |
|
|||
287 |
|
287 | |||
288 | def parse_job_id(self, output): |
|
288 | def parse_job_id(self, output): | |
289 |
m = re. |
|
289 | m = re.search(self.job_id_regexp, output) | |
290 | if m is not None: |
|
290 | if m is not None: | |
291 | job_id = m.group() |
|
291 | job_id = m.group() | |
292 | else: |
|
292 | else: | |
@@ -295,24 +295,31 b' class BatchEngineSet(object):' | |||||
295 | log.msg('Job started with job id: %r' % job_id) |
|
295 | log.msg('Job started with job id: %r' % job_id) | |
296 | return job_id |
|
296 | return job_id | |
297 |
|
297 | |||
298 | def write_batch_script(self, n): |
|
|||
299 | self.context['n'] = n |
|
|||
300 | template = open(self.template_file, 'r').read() |
|
|||
301 | log.msg('Using template for batch script: %s' % self.template_file) |
|
|||
302 | script_as_string = Itpl.itplns(template, self.context) |
|
|||
303 | log.msg('Writing instantiated batch script: %s' % self.batch_file) |
|
|||
304 | f = open(self.batch_file,'w') |
|
|||
305 | f.write(script_as_string) |
|
|||
306 | f.close() |
|
|||
307 |
|
||||
308 | def handle_error(self, f): |
|
298 | def handle_error(self, f): | |
309 | f.printTraceback() |
|
299 | f.printTraceback() | |
310 | f.raiseException() |
|
300 | f.raiseException() | |
311 |
|
301 | |||
312 | def start(self, n): |
|
302 | def start(self, n): | |
313 | self.write_batch_script(n) |
|
303 | log.msg("starting %d engines" % n) | |
|
304 | self._temp_file = tempfile.NamedTemporaryFile() | |||
|
305 | regex = re.compile(self.job_array_regexp) | |||
|
306 | if self.template_file: | |||
|
307 | log.msg("Using %s script %s" % (self.name, self.template_file)) | |||
|
308 | contents = open(self.template_file, 'r').read() | |||
|
309 | if not regex.search(contents): | |||
|
310 | log.msg("adding job array settings to %s script" % self.name) | |||
|
311 | contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents | |||
|
312 | self._temp_file.write(contents) | |||
|
313 | self.template_file = self._temp_file.name | |||
|
314 | else: | |||
|
315 | log.msg("using default ipengine %s script: \n%s" % | |||
|
316 | (self.name, (self.default_template % n))) | |||
|
317 | self._temp_file.file.write(self.default_template % n) | |||
|
318 | self.template_file = self._temp_file.name | |||
|
319 | self._temp_file.file.flush() | |||
314 | d = getProcessOutput(self.submit_command, |
|
320 | d = getProcessOutput(self.submit_command, | |
315 | [self.batch_file],env=os.environ) |
|
321 | [self.template_file], | |
|
322 | env=os.environ) | |||
316 | d.addCallback(self.parse_job_id) |
|
323 | d.addCallback(self.parse_job_id) | |
317 | d.addErrback(self.handle_error) |
|
324 | d.addErrback(self.handle_error) | |
318 | return d |
|
325 | return d | |
@@ -324,55 +331,25 b' class BatchEngineSet(object):' | |||||
324 |
|
331 | |||
325 | class PBSEngineSet(BatchEngineSet): |
|
332 | class PBSEngineSet(BatchEngineSet): | |
326 |
|
333 | |||
|
334 | name = 'PBS' | |||
327 | submit_command = 'qsub' |
|
335 | submit_command = 'qsub' | |
328 | delete_command = 'qdel' |
|
336 | delete_command = 'qdel' | |
|
337 | script_param_prefix = "#PBS" | |||
329 | job_id_regexp = '\d+' |
|
338 | job_id_regexp = '\d+' | |
330 |
|
339 | job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+' | ||
331 | def __init__(self, template_file, **kwargs): |
|
340 | default_template="""#PBS -V | |
332 | BatchEngineSet.__init__(self, template_file, **kwargs) |
|
341 | #PBS -t 1-%d | |
|
342 | #PBS -N ipengine | |||
|
343 | eid=$(($PBS_ARRAYID - 1)) | |||
|
344 | ipengine --logfile=ipengine${eid}.log | |||
|
345 | """ | |||
333 |
|
346 | |||
334 | class SGEEngineSet(PBSEngineSet): |
|
347 | class SGEEngineSet(PBSEngineSet): | |
335 |
|
348 | |||
336 | def __init__(self, template_file, **kwargs): |
|
349 | name = 'SGE' | |
337 | BatchEngineSet.__init__(self, template_file, **kwargs) |
|
350 | script_param_prefix = "#$" | |
338 | self._temp_file = None |
|
351 | job_array_regexp = '#\$[ \t]+-t[ \t]+\d+' | |
339 |
|
352 | default_template="""#$ -V | ||
340 | def parse_job_id(self, output): |
|
|||
341 | m = re.search(self.job_id_regexp, output) |
|
|||
342 | if m is not None: |
|
|||
343 | job_id = m.group() |
|
|||
344 | else: |
|
|||
345 | raise Exception("job id couldn't be determined: %s" % output) |
|
|||
346 | self.job_id = job_id |
|
|||
347 | log.msg('job started with job id: %r' % job_id) |
|
|||
348 | return job_id |
|
|||
349 |
|
||||
350 | def start(self, n): |
|
|||
351 | log.msg("starting %d engines" % n) |
|
|||
352 | self._temp_file = tempfile.NamedTemporaryFile() |
|
|||
353 | regex = re.compile('#\$[ \t]+-t[ \t]+\d+') |
|
|||
354 | if self.template_file: |
|
|||
355 | log.msg("Using sge script %s" % self.template_file) |
|
|||
356 | contents = open(self.template_file, 'r').read() |
|
|||
357 | if not regex.search(contents): |
|
|||
358 | log.msg("adding job array settings to sge script") |
|
|||
359 | contents = ("#$ -t 1-%d\n" % n) + contents |
|
|||
360 | self._temp_file.write(contents) |
|
|||
361 | self.template_file = self._temp_file.name |
|
|||
362 | else: |
|
|||
363 | log.msg("using default ipengine sge script: \n%s" % |
|
|||
364 | (sge_template % n)) |
|
|||
365 | self._temp_file.file.write(sge_template % n) |
|
|||
366 | self.template_file = self._temp_file.name |
|
|||
367 | self._temp_file.file.flush() |
|
|||
368 | d = getProcessOutput(self.submit_command, |
|
|||
369 | [self.template_file], |
|
|||
370 | env=os.environ) |
|
|||
371 | d.addCallback(self.parse_job_id) |
|
|||
372 | d.addErrback(self.handle_error) |
|
|||
373 | return d |
|
|||
374 |
|
||||
375 | sge_template="""#$ -V |
|
|||
376 | #$ -t 1-%d |
|
353 | #$ -t 1-%d | |
377 | #$ -N ipengine |
|
354 | #$ -N ipengine | |
378 | eid=$(($SGE_TASK_ID - 1)) |
|
355 | eid=$(($SGE_TASK_ID - 1)) | |
@@ -857,7 +834,7 b' def get_args():' | |||||
857 | type=str, |
|
834 | type=str, | |
858 | dest='pbsscript', |
|
835 | dest='pbsscript', | |
859 | help='PBS script template', |
|
836 | help='PBS script template', | |
860 |
default=' |
|
837 | default='' | |
861 | ) |
|
838 | ) | |
862 | parser_pbs.set_defaults(func=main_pbs) |
|
839 | parser_pbs.set_defaults(func=main_pbs) | |
863 |
|
840 |
General Comments 0
You need to be logged in to leave comments.
Login now