##// END OF EJS Templates
add --queue to ipcluster's SGE/PBS/LSF subcmds
Justin Riley -
Show More
@@ -277,13 +277,16 b' class BatchEngineSet(object):'
277 277 name = ''
278 278 submit_command = ''
279 279 delete_command = ''
280 script_param_prefix = ''
281 280 job_id_regexp = ''
282 281 job_array_regexp = ''
282 job_array_template = ''
283 queue_regexp = ''
284 queue_template = ''
283 285 default_template = ''
284 286
285 def __init__(self, template_file, **kwargs):
287 def __init__(self, template_file, queue, **kwargs):
286 288 self.template_file = template_file
289 self.queue = queue
287 290
288 291 def parse_job_id(self, output):
289 292 m = re.search(self.job_id_regexp, output)
@@ -302,19 +305,31 b' class BatchEngineSet(object):'
302 305 def start(self, n):
303 306 log.msg("starting %d engines" % n)
304 307 self._temp_file = tempfile.NamedTemporaryFile()
305 regex = re.compile(self.job_array_regexp)
306 308 if self.template_file:
307 309 log.msg("Using %s script %s" % (self.name, self.template_file))
308 310 contents = open(self.template_file, 'r').read()
311 new_script = contents
312 regex = re.compile(self.job_array_regexp)
309 313 if not regex.search(contents):
310 314 log.msg("adding job array settings to %s script" % self.name)
311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
312 self._temp_file.write(contents)
315 new_script = self.job_array_template % n +'\n' + new_script
316 print self.queue_regexp
317 regex = re.compile(self.queue_regexp)
318 print regex.search(contents)
319 if self.queue and not regex.search(contents):
320 log.msg("adding queue settings to %s script" % self.name)
321 new_script = self.queue_template % self.queue + '\n' + new_script
322 if new_script != contents:
323 self._temp_file.write(new_script)
313 324 self.template_file = self._temp_file.name
314 325 else:
326 default_script = self.default_template % n
327 if self.queue:
328 default_script = self.queue_template % self.queue + \
329 '\n' + default_script
315 330 log.msg("using default ipengine %s script: \n%s" %
316 (self.name, (self.default_template % n)))
317 self._temp_file.file.write(self.default_template % n)
331 (self.name, default_script))
332 self._temp_file.file.write(default_script)
318 333 self.template_file = self._temp_file.name
319 334 self._temp_file.file.flush()
320 335 d = getProcessOutput(self.submit_command,
@@ -334,9 +349,11 b' class PBSEngineSet(BatchEngineSet):'
334 349 name = 'PBS'
335 350 submit_command = 'qsub'
336 351 delete_command = 'qdel'
337 script_param_prefix = "#PBS"
338 352 job_id_regexp = '\d+'
339 353 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
354 job_array_template = '#PBS -t 1-%d'
355 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
356 queue_template = '#PBS -q %s'
340 357 default_template="""#PBS -V
341 358 #PBS -t 1-%d
342 359 #PBS -N ipengine
@@ -347,8 +364,10 b' ipengine --logfile=ipengine${eid}.log'
347 364 class SGEEngineSet(PBSEngineSet):
348 365
349 366 name = 'SGE'
350 script_param_prefix = "#$"
351 367 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
368 job_array_template = '#$ -t 1-%d'
369 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
370 queue_template = '#$ -q %s'
352 371 default_template="""#$ -V
353 372 #$ -t 1-%d
354 373 #$ -N ipengine
@@ -361,9 +380,11 b' class LSFEngineSet(PBSEngineSet):'
361 380 name = 'LSF'
362 381 submit_command = 'bsub'
363 382 delete_command = 'bkill'
364 script_param_prefix = "#BSUB"
365 job_array_regexp = '#BSUB[ \t]+\w+\[\d+-\d+\]'
366 default_template="""#BSUB ipengine[1-%d]
383 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
384 job_array_template = '#BSUB -J ipengine[1-%d]'
385 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
386 queue_template = '#BSUB -q %s'
387 default_template="""#BSUB -J ipengine[1-%d]
367 388 eid=$(($LSB_JOBINDEX - 1))
368 389 ipengine --logfile=ipengine${eid}.log
369 390 """
@@ -654,7 +675,7 b' def main_pbs(args):'
654 675 cl = ControllerLauncher(extra_args=cont_args)
655 676 dstart = cl.start()
656 677 def start_engines(r):
657 pbs_set = PBSEngineSet(args.pbsscript)
678 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
658 679 def shutdown(signum, frame):
659 680 log.msg('Stopping PBS cluster')
660 681 d = pbs_set.kill()
@@ -687,7 +708,7 b' def main_sge(args):'
687 708 cl = ControllerLauncher(extra_args=cont_args)
688 709 dstart = cl.start()
689 710 def start_engines(r):
690 sge_set = SGEEngineSet(args.sgescript)
711 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
691 712 def shutdown(signum, frame):
692 713 log.msg('Stopping sge cluster')
693 714 d = sge_set.kill()
@@ -720,7 +741,7 b' def main_lsf(args):'
720 741 cl = ControllerLauncher(extra_args=cont_args)
721 742 dstart = cl.start()
722 743 def start_engines(r):
723 lsf_set = LSFEngineSet(args.lsfscript)
744 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
724 745 def shutdown(signum, frame):
725 746 log.msg('Stopping LSF cluster')
726 747 d = lsf_set.kill()
@@ -879,12 +900,21 b' def get_args():'
879 900 parents=[base_parser]
880 901 )
881 902 parser_pbs.add_argument(
903 '-s',
882 904 '--pbs-script',
883 905 type=str,
884 906 dest='pbsscript',
885 907 help='PBS script template',
886 908 default=''
887 909 )
910 parser_pbs.add_argument(
911 '-q',
912 '--queue',
913 type=str,
914 dest='pbsqueue',
915 help='PBS queue to use when starting the engines',
916 default=None,
917 )
888 918 parser_pbs.set_defaults(func=main_pbs)
889 919
890 920 parser_sge = subparsers.add_parser(
@@ -893,12 +923,21 b' def get_args():'
893 923 parents=[base_parser]
894 924 )
895 925 parser_sge.add_argument(
926 '-s',
896 927 '--sge-script',
897 928 type=str,
898 929 dest='sgescript',
899 930 help='SGE script template',
900 931 default='' # SGEEngineSet will create one if not specified
901 932 )
933 parser_sge.add_argument(
934 '-q',
935 '--queue',
936 type=str,
937 dest='sgequeue',
938 help='SGE queue to use when starting the engines',
939 default=None,
940 )
902 941 parser_sge.set_defaults(func=main_sge)
903 942
904 943 parser_lsf = subparsers.add_parser(
@@ -906,13 +945,24 b' def get_args():'
906 945 help='run an lsf cluster',
907 946 parents=[base_parser]
908 947 )
948
909 949 parser_lsf.add_argument(
950 '-s',
910 951 '--lsf-script',
911 952 type=str,
912 953 dest='lsfscript',
913 954 help='LSF script template',
914 955 default='' # LSFEngineSet will create one if not specified
915 956 )
957
958 parser_lsf.add_argument(
959 '-q',
960 '--queue',
961 type=str,
962 dest='lsfqueue',
963 help='LSF queue to use when starting the engines',
964 default=None,
965 )
916 966 parser_lsf.set_defaults(func=main_lsf)
917 967
918 968 parser_ssh = subparsers.add_parser(
General Comments 0
You need to be logged in to leave comments. Login now