##// END OF EJS Templates
add --queue to ipcluster's SGE/PBS/LSF subcmds
Justin Riley -
Show More
@@ -277,13 +277,16 b' class BatchEngineSet(object):'
277 name = ''
277 name = ''
278 submit_command = ''
278 submit_command = ''
279 delete_command = ''
279 delete_command = ''
280 script_param_prefix = ''
281 job_id_regexp = ''
280 job_id_regexp = ''
282 job_array_regexp = ''
281 job_array_regexp = ''
282 job_array_template = ''
283 queue_regexp = ''
284 queue_template = ''
283 default_template = ''
285 default_template = ''
284
286
285 def __init__(self, template_file, **kwargs):
287 def __init__(self, template_file, queue, **kwargs):
286 self.template_file = template_file
288 self.template_file = template_file
289 self.queue = queue
287
290
288 def parse_job_id(self, output):
291 def parse_job_id(self, output):
289 m = re.search(self.job_id_regexp, output)
292 m = re.search(self.job_id_regexp, output)
@@ -302,19 +305,31 b' class BatchEngineSet(object):'
302 def start(self, n):
305 def start(self, n):
303 log.msg("starting %d engines" % n)
306 log.msg("starting %d engines" % n)
304 self._temp_file = tempfile.NamedTemporaryFile()
307 self._temp_file = tempfile.NamedTemporaryFile()
305 regex = re.compile(self.job_array_regexp)
306 if self.template_file:
308 if self.template_file:
307 log.msg("Using %s script %s" % (self.name, self.template_file))
309 log.msg("Using %s script %s" % (self.name, self.template_file))
308 contents = open(self.template_file, 'r').read()
310 contents = open(self.template_file, 'r').read()
311 new_script = contents
312 regex = re.compile(self.job_array_regexp)
309 if not regex.search(contents):
313 if not regex.search(contents):
310 log.msg("adding job array settings to %s script" % self.name)
314 log.msg("adding job array settings to %s script" % self.name)
311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
315 new_script = self.job_array_template % n +'\n' + new_script
312 self._temp_file.write(contents)
316 print self.queue_regexp
317 regex = re.compile(self.queue_regexp)
318 print regex.search(contents)
319 if self.queue and not regex.search(contents):
320 log.msg("adding queue settings to %s script" % self.name)
321 new_script = self.queue_template % self.queue + '\n' + new_script
322 if new_script != contents:
323 self._temp_file.write(new_script)
313 self.template_file = self._temp_file.name
324 self.template_file = self._temp_file.name
314 else:
325 else:
326 default_script = self.default_template % n
327 if self.queue:
328 default_script = self.queue_template % self.queue + \
329 '\n' + default_script
315 log.msg("using default ipengine %s script: \n%s" %
330 log.msg("using default ipengine %s script: \n%s" %
316 (self.name, (self.default_template % n)))
331 (self.name, default_script))
317 self._temp_file.file.write(self.default_template % n)
332 self._temp_file.file.write(default_script)
318 self.template_file = self._temp_file.name
333 self.template_file = self._temp_file.name
319 self._temp_file.file.flush()
334 self._temp_file.file.flush()
320 d = getProcessOutput(self.submit_command,
335 d = getProcessOutput(self.submit_command,
@@ -334,9 +349,11 b' class PBSEngineSet(BatchEngineSet):'
334 name = 'PBS'
349 name = 'PBS'
335 submit_command = 'qsub'
350 submit_command = 'qsub'
336 delete_command = 'qdel'
351 delete_command = 'qdel'
337 script_param_prefix = "#PBS"
338 job_id_regexp = '\d+'
352 job_id_regexp = '\d+'
339 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
353 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
354 job_array_template = '#PBS -t 1-%d'
355 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
356 queue_template = '#PBS -q %s'
340 default_template="""#PBS -V
357 default_template="""#PBS -V
341 #PBS -t 1-%d
358 #PBS -t 1-%d
342 #PBS -N ipengine
359 #PBS -N ipengine
@@ -347,8 +364,10 b' ipengine --logfile=ipengine${eid}.log'
347 class SGEEngineSet(PBSEngineSet):
364 class SGEEngineSet(PBSEngineSet):
348
365
349 name = 'SGE'
366 name = 'SGE'
350 script_param_prefix = "#$"
351 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
367 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
368 job_array_template = '#$ -t 1-%d'
369 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
370 queue_template = '#$ -q %s'
352 default_template="""#$ -V
371 default_template="""#$ -V
353 #$ -t 1-%d
372 #$ -t 1-%d
354 #$ -N ipengine
373 #$ -N ipengine
@@ -361,9 +380,11 b' class LSFEngineSet(PBSEngineSet):'
361 name = 'LSF'
380 name = 'LSF'
362 submit_command = 'bsub'
381 submit_command = 'bsub'
363 delete_command = 'bkill'
382 delete_command = 'bkill'
364 script_param_prefix = "#BSUB"
383 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
365 job_array_regexp = '#BSUB[ \t]+\w+\[\d+-\d+\]'
384 job_array_template = '#BSUB -J ipengine[1-%d]'
366 default_template="""#BSUB ipengine[1-%d]
385 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
386 queue_template = '#BSUB -q %s'
387 default_template="""#BSUB -J ipengine[1-%d]
367 eid=$(($LSB_JOBINDEX - 1))
388 eid=$(($LSB_JOBINDEX - 1))
368 ipengine --logfile=ipengine${eid}.log
389 ipengine --logfile=ipengine${eid}.log
369 """
390 """
@@ -654,7 +675,7 b' def main_pbs(args):'
654 cl = ControllerLauncher(extra_args=cont_args)
675 cl = ControllerLauncher(extra_args=cont_args)
655 dstart = cl.start()
676 dstart = cl.start()
656 def start_engines(r):
677 def start_engines(r):
657 pbs_set = PBSEngineSet(args.pbsscript)
678 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
658 def shutdown(signum, frame):
679 def shutdown(signum, frame):
659 log.msg('Stopping PBS cluster')
680 log.msg('Stopping PBS cluster')
660 d = pbs_set.kill()
681 d = pbs_set.kill()
@@ -687,7 +708,7 b' def main_sge(args):'
687 cl = ControllerLauncher(extra_args=cont_args)
708 cl = ControllerLauncher(extra_args=cont_args)
688 dstart = cl.start()
709 dstart = cl.start()
689 def start_engines(r):
710 def start_engines(r):
690 sge_set = SGEEngineSet(args.sgescript)
711 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
691 def shutdown(signum, frame):
712 def shutdown(signum, frame):
692 log.msg('Stopping sge cluster')
713 log.msg('Stopping sge cluster')
693 d = sge_set.kill()
714 d = sge_set.kill()
@@ -720,7 +741,7 b' def main_lsf(args):'
720 cl = ControllerLauncher(extra_args=cont_args)
741 cl = ControllerLauncher(extra_args=cont_args)
721 dstart = cl.start()
742 dstart = cl.start()
722 def start_engines(r):
743 def start_engines(r):
723 lsf_set = LSFEngineSet(args.lsfscript)
744 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
724 def shutdown(signum, frame):
745 def shutdown(signum, frame):
725 log.msg('Stopping LSF cluster')
746 log.msg('Stopping LSF cluster')
726 d = lsf_set.kill()
747 d = lsf_set.kill()
@@ -872,47 +893,76 b' def get_args():'
872 help="how to call MPI_Init (default=mpi4py)"
893 help="how to call MPI_Init (default=mpi4py)"
873 )
894 )
874 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
895 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
875
896
876 parser_pbs = subparsers.add_parser(
897 parser_pbs = subparsers.add_parser(
877 'pbs',
898 'pbs',
878 help='run a pbs cluster',
899 help='run a pbs cluster',
879 parents=[base_parser]
900 parents=[base_parser]
880 )
901 )
881 parser_pbs.add_argument(
902 parser_pbs.add_argument(
903 '-s',
882 '--pbs-script',
904 '--pbs-script',
883 type=str,
905 type=str,
884 dest='pbsscript',
906 dest='pbsscript',
885 help='PBS script template',
907 help='PBS script template',
886 default=''
908 default=''
887 )
909 )
910 parser_pbs.add_argument(
911 '-q',
912 '--queue',
913 type=str,
914 dest='pbsqueue',
915 help='PBS queue to use when starting the engines',
916 default=None,
917 )
888 parser_pbs.set_defaults(func=main_pbs)
918 parser_pbs.set_defaults(func=main_pbs)
889
919
890 parser_sge = subparsers.add_parser(
920 parser_sge = subparsers.add_parser(
891 'sge',
921 'sge',
892 help='run an sge cluster',
922 help='run an sge cluster',
893 parents=[base_parser]
923 parents=[base_parser]
894 )
924 )
895 parser_sge.add_argument(
925 parser_sge.add_argument(
926 '-s',
896 '--sge-script',
927 '--sge-script',
897 type=str,
928 type=str,
898 dest='sgescript',
929 dest='sgescript',
899 help='SGE script template',
930 help='SGE script template',
900 default='' # SGEEngineSet will create one if not specified
931 default='' # SGEEngineSet will create one if not specified
901 )
932 )
933 parser_sge.add_argument(
934 '-q',
935 '--queue',
936 type=str,
937 dest='sgequeue',
938 help='SGE queue to use when starting the engines',
939 default=None,
940 )
902 parser_sge.set_defaults(func=main_sge)
941 parser_sge.set_defaults(func=main_sge)
903
942
904 parser_lsf = subparsers.add_parser(
943 parser_lsf = subparsers.add_parser(
905 'lsf',
944 'lsf',
906 help='run an lsf cluster',
945 help='run an lsf cluster',
907 parents=[base_parser]
946 parents=[base_parser]
908 )
947 )
948
909 parser_lsf.add_argument(
949 parser_lsf.add_argument(
950 '-s',
910 '--lsf-script',
951 '--lsf-script',
911 type=str,
952 type=str,
912 dest='lsfscript',
953 dest='lsfscript',
913 help='LSF script template',
954 help='LSF script template',
914 default='' # LSFEngineSet will create one if not specified
955 default='' # LSFEngineSet will create one if not specified
915 )
956 )
957
958 parser_lsf.add_argument(
959 '-q',
960 '--queue',
961 type=str,
962 dest='lsfqueue',
963 help='LSF queue to use when starting the engines',
964 default=None,
965 )
916 parser_lsf.set_defaults(func=main_lsf)
966 parser_lsf.set_defaults(func=main_lsf)
917
967
918 parser_ssh = subparsers.add_parser(
968 parser_ssh = subparsers.add_parser(
General Comments 0
You need to be logged in to leave comments. Login now