##// END OF EJS Templates
Merge branch '0.10.1-sge' of http://github.com/satra/ipython into satra-0.10.1-sge...
Fernando Perez -
r3066:f780f2e9 merge
parent child Browse files
Show More
@@ -18,6 +18,7 b' import os'
18 18 import re
19 19 import sys
20 20 import signal
21 import stat
21 22 import tempfile
22 23 pjoin = os.path.join
23 24
@@ -234,6 +235,7 b' class LocalEngineSet(object):'
234 235 def start(self, n):
235 236 dlist = []
236 237 for i in range(n):
238 print "starting engine:", i
237 239 el = EngineLauncher(extra_args=self.extra_args)
238 240 d = el.start()
239 241 self.launchers.append(el)
@@ -270,22 +272,25 b' class LocalEngineSet(object):'
270 272 dfinal.addCallback(self._handle_stop)
271 273 return dfinal
272 274
273
274 275 class BatchEngineSet(object):
275
276 # Subclasses must fill these in. See PBSEngineSet
276
277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
278 name = ''
277 279 submit_command = ''
278 280 delete_command = ''
279 281 job_id_regexp = ''
280
281 def __init__(self, template_file, **kwargs):
282 job_array_regexp = ''
283 job_array_template = ''
284 queue_regexp = ''
285 queue_template = ''
286 default_template = ''
287
288 def __init__(self, template_file, queue, **kwargs):
282 289 self.template_file = template_file
283 self.context = {}
284 self.context.update(kwargs)
285 self.batch_file = self.template_file+'-run'
286
290 self.queue = queue
291
287 292 def parse_job_id(self, output):
288 m = re.match(self.job_id_regexp, output)
293 m = re.search(self.job_id_regexp, output)
289 294 if m is not None:
290 295 job_id = m.group()
291 296 else:
@@ -293,46 +298,120 b' class BatchEngineSet(object):'
293 298 self.job_id = job_id
294 299 log.msg('Job started with job id: %r' % job_id)
295 300 return job_id
296
297 def write_batch_script(self, n):
298 self.context['n'] = n
299 template = open(self.template_file, 'r').read()
300 log.msg('Using template for batch script: %s' % self.template_file)
301 script_as_string = Itpl.itplns(template, self.context)
302 log.msg('Writing instantiated batch script: %s' % self.batch_file)
303 f = open(self.batch_file,'w')
304 f.write(script_as_string)
305 f.close()
306
301
307 302 def handle_error(self, f):
308 303 f.printTraceback()
309 304 f.raiseException()
310
305
311 306 def start(self, n):
312 self.write_batch_script(n)
307 log.msg("starting %d engines" % n)
308 self._temp_file = tempfile.NamedTemporaryFile()
309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
310 if self.template_file:
311 log.msg("Using %s script %s" % (self.name, self.template_file))
312 contents = open(self.template_file, 'r').read()
313 new_script = contents
314 regex = re.compile(self.job_array_regexp)
315 if not regex.search(contents):
316 log.msg("adding job array settings to %s script" % self.name)
317 new_script = self.job_array_template % n +'\n' + new_script
318 print self.queue_regexp
319 regex = re.compile(self.queue_regexp)
320 print regex.search(contents)
321 if self.queue and not regex.search(contents):
322 log.msg("adding queue settings to %s script" % self.name)
323 new_script = self.queue_template % self.queue + '\n' + new_script
324 if new_script != contents:
325 self._temp_file.write(new_script)
326 self.template_file = self._temp_file.name
327 else:
328 default_script = self.default_template % n
329 if self.queue:
330 default_script = self.queue_template % self.queue + \
331 '\n' + default_script
332 log.msg("using default ipengine %s script: \n%s" %
333 (self.name, default_script))
334 self._temp_file.file.write(default_script)
335 self.template_file = self._temp_file.name
336 self._temp_file.file.close()
313 337 d = getProcessOutput(self.submit_command,
314 [self.batch_file],env=os.environ)
338 [self.template_file],
339 env=os.environ)
315 340 d.addCallback(self.parse_job_id)
316 341 d.addErrback(self.handle_error)
317 342 return d
318
343
319 344 def kill(self):
320 345 d = getProcessOutput(self.delete_command,
321 346 [self.job_id],env=os.environ)
322 347 return d
323 348
324 349 class PBSEngineSet(BatchEngineSet):
325
350
351 name = 'PBS'
326 352 submit_command = 'qsub'
327 353 delete_command = 'qdel'
328 354 job_id_regexp = '\d+'
329
330 def __init__(self, template_file, **kwargs):
331 BatchEngineSet.__init__(self, template_file, **kwargs)
355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
356 job_array_template = '#PBS -t 1-%d'
357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
358 queue_template = '#PBS -q %s'
359 default_template="""#!/bin/sh
360 #PBS -V
361 #PBS -t 1-%d
362 #PBS -N ipengine
363 eid=$(($PBS_ARRAYID - 1))
364 ipengine --logfile=ipengine${eid}.log
365 """
366
367 class SGEEngineSet(PBSEngineSet):
368
369 name = 'SGE'
370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
371 job_array_template = '#$ -t 1-%d'
372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
373 queue_template = '#$ -q %s'
374 default_template="""#$ -V
375 #$ -S /bin/sh
376 #$ -t 1-%d
377 #$ -N ipengine
378 eid=$(($SGE_TASK_ID - 1))
379 ipengine --logfile=ipengine${eid}.log
380 """
381
382 class LSFEngineSet(PBSEngineSet):
383
384 name = 'LSF'
385 submit_command = 'bsub'
386 delete_command = 'bkill'
387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
388 job_array_template = '#BSUB -J ipengine[1-%d]'
389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
390 queue_template = '#BSUB -q %s'
391 default_template="""#!/bin/sh
392 #BSUB -J ipengine[1-%d]
393 eid=$(($LSB_JOBINDEX - 1))
394 ipengine --logfile=ipengine${eid}.log
395 """
396 bsub_wrapper="""#!/bin/sh
397 bsub < $1
398 """
399
400 def __init__(self, template_file, queue, **kwargs):
401 self._bsub_wrapper = self._make_bsub_wrapper()
402 self.submit_command = self._bsub_wrapper.name
403 PBSEngineSet.__init__(self,template_file, queue, **kwargs)
332 404
405 def _make_bsub_wrapper(self):
406 bsub_wrapper = tempfile.NamedTemporaryFile()
407 bsub_wrapper.write(self.bsub_wrapper)
408 bsub_wrapper.file.close()
409 os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
410 return bsub_wrapper
333 411
334 sshx_template="""#!/bin/sh
335 "$@" &> /dev/null &
412 sshx_template_prefix="""#!/bin/sh
413 """
414 sshx_template_suffix=""""$@" &> /dev/null &
336 415 echo $!
337 416 """
338 417
@@ -340,11 +419,19 b' engine_killer_template="""#!/bin/sh'
340 419 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
341 420 """
342 421
422 def escape_strings(val):
423 val = val.replace('(','\(')
424 val = val.replace(')','\)')
425 if ' ' in val:
426 val = '"%s"'%val
427 return val
428
343 429 class SSHEngineSet(object):
344 sshx_template=sshx_template
430 sshx_template_prefix=sshx_template_prefix
431 sshx_template_suffix=sshx_template_suffix
345 432 engine_killer_template=engine_killer_template
346 433
347 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
434 def __init__(self, engine_hosts, sshx=None, copyenvs=None, ipengine="ipengine"):
348 435 """Start a controller on localhost and engines using ssh.
349 436
350 437 The engine_hosts argument is a dict with hostnames as keys and
@@ -363,7 +450,12 b' class SSHEngineSet(object):'
363 450 '%s-main-sshx.sh' % os.environ['USER']
364 451 )
365 452 f = open(self.sshx, 'w')
366 f.writelines(self.sshx_template)
453 f.writelines(self.sshx_template_prefix)
454 if copyenvs:
455 for key, val in sorted(os.environ.items()):
456 newval = escape_strings(val)
457 f.writelines('export %s=%s\n'%(key,newval))
458 f.writelines(self.sshx_template_suffix)
367 459 f.close()
368 460 self.engine_command = ipengine
369 461 self.engine_hosts = engine_hosts
@@ -609,13 +701,17 b' def main_pbs(args):'
609 701 # See if we are reusing FURL files
610 702 if not check_reuse(args, cont_args):
611 703 return
704
705 if args.pbsscript and not os.path.isfile(args.pbsscript):
706 log.err('PBS script does not exist: %s' % args.pbsscript)
707 return
612 708
613 709 cl = ControllerLauncher(extra_args=cont_args)
614 710 dstart = cl.start()
615 711 def start_engines(r):
616 pbs_set = PBSEngineSet(args.pbsscript)
712 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
617 713 def shutdown(signum, frame):
618 log.msg('Stopping pbs cluster')
714 log.msg('Stopping PBS cluster')
619 715 d = pbs_set.kill()
620 716 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
621 717 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
@@ -627,6 +723,72 b' def main_pbs(args):'
627 723 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
628 724 dstart.addErrback(_err_and_stop)
629 725
726 def main_sge(args):
727 cont_args = []
728 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
729
730 # Check security settings before proceeding
731 if not check_security(args, cont_args):
732 return
733
734 # See if we are reusing FURL files
735 if not check_reuse(args, cont_args):
736 return
737
738 if args.sgescript and not os.path.isfile(args.sgescript):
739 log.err('SGE script does not exist: %s' % args.sgescript)
740 return
741
742 cl = ControllerLauncher(extra_args=cont_args)
743 dstart = cl.start()
744 def start_engines(r):
745 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
746 def shutdown(signum, frame):
747 log.msg('Stopping sge cluster')
748 d = sge_set.kill()
749 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
750 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
751 signal.signal(signal.SIGINT,shutdown)
752 d = sge_set.start(args.n)
753 return d
754 config = kernel_config_manager.get_config_obj()
755 furl_file = config['controller']['engine_furl_file']
756 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
757 dstart.addErrback(_err_and_stop)
758
759 def main_lsf(args):
760 cont_args = []
761 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
762
763 # Check security settings before proceeding
764 if not check_security(args, cont_args):
765 return
766
767 # See if we are reusing FURL files
768 if not check_reuse(args, cont_args):
769 return
770
771 if args.lsfscript and not os.path.isfile(args.lsfscript):
772 log.err('LSF script does not exist: %s' % args.lsfscript)
773 return
774
775 cl = ControllerLauncher(extra_args=cont_args)
776 dstart = cl.start()
777 def start_engines(r):
778 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
779 def shutdown(signum, frame):
780 log.msg('Stopping LSF cluster')
781 d = lsf_set.kill()
782 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
783 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
784 signal.signal(signal.SIGINT,shutdown)
785 d = lsf_set.start(args.n)
786 return d
787 config = kernel_config_manager.get_config_obj()
788 furl_file = config['controller']['engine_furl_file']
789 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
790 dstart.addErrback(_err_and_stop)
791
630 792
631 793 def main_ssh(args):
632 794 """Start a controller on localhost and engines using ssh.
@@ -658,7 +820,8 b' def main_ssh(args):'
658 820 cl = ControllerLauncher(extra_args=cont_args)
659 821 dstart = cl.start()
660 822 def start_engines(cont_pid):
661 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
823 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx,
824 copyenvs=args.copyenvs)
662 825 def shutdown(signum, frame):
663 826 d = ssh_set.kill()
664 827 cl.interrupt_then_kill(1.0)
@@ -765,20 +928,77 b' def get_args():'
765 928 help="how to call MPI_Init (default=mpi4py)"
766 929 )
767 930 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
768
931
769 932 parser_pbs = subparsers.add_parser(
770 'pbs',
933 'pbs',
771 934 help='run a pbs cluster',
772 935 parents=[base_parser]
773 936 )
774 937 parser_pbs.add_argument(
938 '-s',
775 939 '--pbs-script',
776 type=str,
940 type=str,
777 941 dest='pbsscript',
778 942 help='PBS script template',
779 default='pbs.template'
943 default=''
944 )
945 parser_pbs.add_argument(
946 '-q',
947 '--queue',
948 type=str,
949 dest='pbsqueue',
950 help='PBS queue to use when starting the engines',
951 default=None,
780 952 )
781 953 parser_pbs.set_defaults(func=main_pbs)
954
955 parser_sge = subparsers.add_parser(
956 'sge',
957 help='run an sge cluster',
958 parents=[base_parser]
959 )
960 parser_sge.add_argument(
961 '-s',
962 '--sge-script',
963 type=str,
964 dest='sgescript',
965 help='SGE script template',
966 default='' # SGEEngineSet will create one if not specified
967 )
968 parser_sge.add_argument(
969 '-q',
970 '--queue',
971 type=str,
972 dest='sgequeue',
973 help='SGE queue to use when starting the engines',
974 default=None,
975 )
976 parser_sge.set_defaults(func=main_sge)
977
978 parser_lsf = subparsers.add_parser(
979 'lsf',
980 help='run an lsf cluster',
981 parents=[base_parser]
982 )
983
984 parser_lsf.add_argument(
985 '-s',
986 '--lsf-script',
987 type=str,
988 dest='lsfscript',
989 help='LSF script template',
990 default='' # LSFEngineSet will create one if not specified
991 )
992
993 parser_lsf.add_argument(
994 '-q',
995 '--queue',
996 type=str,
997 dest='lsfqueue',
998 help='LSF queue to use when starting the engines',
999 default=None,
1000 )
1001 parser_lsf.set_defaults(func=main_lsf)
782 1002
783 1003 parser_ssh = subparsers.add_parser(
784 1004 'ssh',
@@ -786,6 +1006,14 b' def get_args():'
786 1006 parents=[base_parser]
787 1007 )
788 1008 parser_ssh.add_argument(
1009 '-e',
1010 '--copyenvs',
1011 action='store_true',
1012 dest='copyenvs',
1013 help='Copy current shell environment to remote location',
1014 default=False,
1015 )
1016 parser_ssh.add_argument(
789 1017 '--clusterfile',
790 1018 type=str,
791 1019 dest='clusterfile',
@@ -53,7 +53,9 b' The :command:`ipcluster` command provides a simple way of starting a controller '
53 53 2. When engines are started using the :command:`mpirun` command that comes
54 54 with most MPI [MPI]_ implementations
55 55 3. When engines are started using the PBS [PBS]_ batch system.
56 4. When the controller is started on localhost and the engines are started on
56 4. When engines are started using the SGE [SGE]_ batch system.
57 5. When engines are started using the LSF [LSF]_ batch system.
58 6. When the controller is started on localhost and the engines are started on
57 59 remote nodes using :command:`ssh`.
58 60
59 61 .. note::
@@ -126,49 +128,115 b' More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.'
126 128 Using :command:`ipcluster` in PBS mode
127 129 --------------------------------------
128 130
129 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
131 The PBS mode uses the Portable Batch System [PBS]_ to start the engines.
130 132
131 .. sourcecode:: bash
132
133 #PBS -N ipython
134 #PBS -j oe
135 #PBS -l walltime=00:10:00
136 #PBS -l nodes=${n/4}:ppn=4
137 #PBS -q parallel
133 To start an ipcluster using the Portable Batch System::
138 134
139 cd $$PBS_O_WORKDIR
140 export PATH=$$HOME/usr/local/bin
141 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
142 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
135 $ ipcluster pbs -n 12
143 136
144 There are a few important points about this template:
137 The above command will launch a PBS job array with 12 tasks using the default queue. If you would like to submit the job to a different queue use the -q option:
145 138
146 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
147 template engine.
139 $ ipcluster pbs -n 12 -q hpcqueue
148 140
149 2. Instead of putting in the actual number of engines, use the notation
150 ``${n}`` to indicate the number of engines to be started. You can also uses
151 expressions like ``${n/4}`` in the template to indicate the number of
152 nodes.
141 By default, ipcluster will generate and submit a job script to launch the engines. However, if you need to use your own job script use the -s option:
153 142
154 3. Because ``$`` is a special character used by the template engine, you must
155 escape any ``$`` by using ``$$``. This is important when referring to
156 environment variables in the template.
143 $ ipcluster pbs -n 12 -q hpcqueue -s mypbscript.sh
157 144
158 4. Any options to :command:`ipengine` should be given in the batch script
159 template.
145 For example the default autogenerated script looks like::
160 146
161 5. Depending on the configuration of you system, you may have to set
162 environment variables in the script template.
147 #PBS -q hpcqueue
148 #!/bin/sh
149 #PBS -V
150 #PBS -t 1-12
151 #PBS -N ipengine
152 eid=$(($PBS_ARRAYID - 1))
153 ipengine --logfile=ipengine${eid}.log
163 154
164 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
155 .. note::
165 156
166 $ ipcluster pbs -n 128 --pbs-script=pbs.template
157 ipcluster relies on using PBS job arrays to start the
158 engines. If you specify your own job script without specifying the
159 job array settings ipcluster will automatically add the job array
160 settings (#PBS -t 1-N) to your script.
167 161
168 162 Additional command line options for this mode can be found by doing::
169 163
170 164 $ ipcluster pbs -h
171 165
166 Using :command:`ipcluster` in SGE mode
167 --------------------------------------
168
169 The SGE mode uses the Sun Grid Engine [SGE]_ to start the engines.
170
171 To start an ipcluster using Sun Grid Engine::
172
173 $ ipcluster sge -n 12
174
175 The above command will launch an SGE job array with 12 tasks using the default queue. If you would like to submit the job to a different queue use the -q option:
176
177 $ ipcluster sge -n 12 -q hpcqueue
178
179 By default, ipcluster will generate and submit a job script to launch the engines. However, if you need to use your own job script use the -s option:
180
181 $ ipcluster sge -n 12 -q hpcqueue -s mysgescript.sh
182
183 For example the default autogenerated script looks like::
184
185 #$ -q hpcqueue
186 #$ -V
187 #$ -S /bin/sh
188 #$ -t 1-12
189 #$ -N ipengine
190 eid=$(($SGE_TASK_ID - 1))
191 ipengine --logfile=ipengine${eid}.log #$ -V
192
193 .. note::
194
195 ipcluster relies on using SGE job arrays to start the engines. If
196 you specify your own job script without specifying the job array
197 settings ipcluster will automatically add the job array settings (#$ -t
198 1-N) to your script.
199
200 Additional command line options for this mode can be found by doing::
201
202 $ ipcluster sge -h
203
204 Using :command:`ipcluster` in LSF mode
205 --------------------------------------
206
207 The LSF mode uses the Load Sharing Facility [LSF]_ to start the engines.
208
209 To start an ipcluster using the Load Sharing Facility::
210
211 $ ipcluster lsf -n 12
212
213 The above command will launch an LSF job array with 12 tasks using the default queue. If you would like to submit the job to a different queue use the -q option:
214
215 $ ipcluster lsf -n 12 -q hpcqueue
216
217 By default, ipcluster will generate and submit a job script to launch the engines. However, if you need to use your own job script use the -s option:
218
219 $ ipcluster lsf -n 12 -q hpcqueue -s mylsfscript.sh
220
221 For example the default autogenerated script looks like::
222
223 #BSUB -q hpcqueue
224 #!/bin/sh
225 #BSUB -J ipengine[1-12]
226 eid=$(($LSB_JOBINDEX - 1))
227 ipengine --logfile=ipengine${eid}.log
228
229 .. note::
230
231 ipcluster relies on using LSF job arrays to start the engines. If you
232 specify your own job script without specifying the job array settings
233 ipcluster will automatically add the job array settings (#BSUB -J
234 ipengine[1-N]) to your script.
235
236 Additional command line options for this mode can be found by doing::
237
238 $ ipcluster lsf -h
239
172 240 Using :command:`ipcluster` in SSH mode
173 241 --------------------------------------
174 242
@@ -348,4 +416,6 b' the log files to us will often help us to debug any problems.'
348 416
349 417
350 418 .. [PBS] Portable Batch System. http://www.openpbs.org/
419 .. [SGE] Sun Grid Engine. http://www.sun.com/software/sge/
420 .. [LSF] Load Sharing Facility. http://www.platform.com/
351 421 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now