##// END OF EJS Templates
add Condor to docs in ipclusterapp
James Booth -
Show More
@@ -1,619 +1,620 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import errno
25 25 import logging
26 26 import os
27 27 import re
28 28 import signal
29 29
30 30 from subprocess import check_call, CalledProcessError, PIPE
31 31 import zmq
32 32 from zmq.eventloop import ioloop
33 33
34 34 from IPython.config.application import Application, boolean_flag, catch_config_error
35 35 from IPython.config.loader import Config
36 36 from IPython.core.application import BaseIPythonApplication
37 37 from IPython.core.profiledir import ProfileDir
38 38 from IPython.utils.daemonize import daemonize
39 39 from IPython.utils.importstring import import_item
40 40 from IPython.utils.sysinfo import num_cpus
41 41 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
42 42 DottedObjectName)
43 43
44 44 from IPython.parallel.apps.baseapp import (
45 45 BaseParallelApplication,
46 46 PIDFileError,
47 47 base_flags, base_aliases
48 48 )
49 49
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Module level variables
53 53 #-----------------------------------------------------------------------------
54 54
55 55
56 56 default_config_file_name = u'ipcluster_config.py'
57 57
58 58
59 59 _description = """Start an IPython cluster for parallel computing.
60 60
61 61 An IPython cluster consists of 1 controller and 1 or more engines.
62 This command automates the startup of these processes using a wide
63 range of startup methods (SSH, local processes, PBS, mpiexec,
62 This command automates the startup of these processes using a wide range of
63 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, Condor,
64 64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 68 """
69 69
70 70 _main_examples = """
71 71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 72 ipcluster start -h # show the help string for the start subcmd
73 73
74 74 ipcluster stop -h # show the help string for the stop subcmd
75 75 ipcluster engines -h # show the help string for the engines subcmd
76 76 """
77 77
78 78 _start_examples = """
79 79 ipython profile create mycluster --parallel # create mycluster profile
80 80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 81 """
82 82
83 83 _stop_examples = """
84 84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 85 """
86 86
87 87 _engines_examples = """
88 88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 89 """
90 90
91 91
92 92 # Exit codes for ipcluster
93 93
94 94 # This will be the exit code if the ipcluster appears to be running because
95 95 # a .pid file exists
96 96 ALREADY_STARTED = 10
97 97
98 98
99 99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 100 # file to be found.
101 101 ALREADY_STOPPED = 11
102 102
103 103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 104 # file to be found.
105 105 NO_CLUSTER = 12
106 106
107 107
108 108 #-----------------------------------------------------------------------------
109 109 # Utilities
110 110 #-----------------------------------------------------------------------------
111 111
112 112 def find_launcher_class(clsname, kind):
113 113 """Return a launcher for a given clsname and kind.
114 114
115 115 Parameters
116 116 ==========
117 117 clsname : str
118 118 The full name of the launcher class, either with or without the
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF,
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, Condor
120 120 WindowsHPC).
121 121 kind : str
122 122 Either 'EngineSet' or 'Controller'.
123 123 """
124 124 if '.' not in clsname:
125 125 # not a module, presume it's the raw name in apps.launcher
126 126 if kind and kind not in clsname:
127 127 # doesn't match necessary full class name, assume it's
128 # just 'PBS' or 'MPI' prefix:
128 # just 'PBS' or 'MPI' etc prefix:
129 129 clsname = clsname + kind + 'Launcher'
130 130 clsname = 'IPython.parallel.apps.launcher.'+clsname
131 131 klass = import_item(clsname)
132 132 return klass
133 133
134 134 #-----------------------------------------------------------------------------
135 135 # Main application
136 136 #-----------------------------------------------------------------------------
137 137
138 138 start_help = """Start an IPython cluster for parallel computing
139 139
140 140 Start an ipython cluster by its profile name or cluster
141 141 directory. Cluster directories contain configuration, log and
142 142 security related files and are named using the convention
143 143 'profile_<name>' and should be creating using the 'start'
144 144 subcommand of 'ipcluster'. If your cluster directory is in
145 145 the cwd or the ipython directory, you can simply refer to it
146 146 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
147 147 otherwise use the 'profile-dir' option.
148 148 """
149 149 stop_help = """Stop a running IPython cluster
150 150
151 151 Stop a running ipython cluster by its profile name or cluster
152 152 directory. Cluster directories are named using the convention
153 153 'profile_<name>'. If your cluster directory is in
154 154 the cwd or the ipython directory, you can simply refer to it
155 155 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
156 156 use the '--profile-dir' option.
157 157 """
158 158 engines_help = """Start engines connected to an existing IPython cluster
159 159
160 160 Start one or more engines to connect to an existing Cluster
161 161 by profile name or cluster directory.
162 162 Cluster directories contain configuration, log and
163 163 security related files and are named using the convention
164 164 'profile_<name>' and should be creating using the 'start'
165 165 subcommand of 'ipcluster'. If your cluster directory is in
166 166 the cwd or the ipython directory, you can simply refer to it
167 167 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
168 168 otherwise use the 'profile-dir' option.
169 169 """
170 170 stop_aliases = dict(
171 171 signal='IPClusterStop.signal',
172 172 )
173 173 stop_aliases.update(base_aliases)
174 174
175 175 class IPClusterStop(BaseParallelApplication):
176 176 name = u'ipcluster'
177 177 description = stop_help
178 178 examples = _stop_examples
179 179 config_file_name = Unicode(default_config_file_name)
180 180
181 181 signal = Integer(signal.SIGINT, config=True,
182 182 help="signal to use for stopping processes.")
183 183
184 184 aliases = Dict(stop_aliases)
185 185
186 186 def start(self):
187 187 """Start the app for the stop subcommand."""
188 188 try:
189 189 pid = self.get_pid_from_file()
190 190 except PIDFileError:
191 191 self.log.critical(
192 192 'Could not read pid file, cluster is probably not running.'
193 193 )
194 194 # Here I exit with a unusual exit status that other processes
195 195 # can watch for to learn how I existed.
196 196 self.remove_pid_file()
197 197 self.exit(ALREADY_STOPPED)
198 198
199 199 if not self.check_pid(pid):
200 200 self.log.critical(
201 201 'Cluster [pid=%r] is not running.' % pid
202 202 )
203 203 self.remove_pid_file()
204 204 # Here I exit with a unusual exit status that other processes
205 205 # can watch for to learn how I existed.
206 206 self.exit(ALREADY_STOPPED)
207 207
208 208 elif os.name=='posix':
209 209 sig = self.signal
210 210 self.log.info(
211 211 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
212 212 )
213 213 try:
214 214 os.kill(pid, sig)
215 215 except OSError:
216 216 self.log.error("Stopping cluster failed, assuming already dead.",
217 217 exc_info=True)
218 218 self.remove_pid_file()
219 219 elif os.name=='nt':
220 220 try:
221 221 # kill the whole tree
222 222 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
223 223 except (CalledProcessError, OSError):
224 224 self.log.error("Stopping cluster failed, assuming already dead.",
225 225 exc_info=True)
226 226 self.remove_pid_file()
227 227
228 228 engine_aliases = {}
229 229 engine_aliases.update(base_aliases)
230 230 engine_aliases.update(dict(
231 231 n='IPClusterEngines.n',
232 232 engines = 'IPClusterEngines.engine_launcher_class',
233 233 daemonize = 'IPClusterEngines.daemonize',
234 234 ))
235 235 engine_flags = {}
236 236 engine_flags.update(base_flags)
237 237
238 238 engine_flags.update(dict(
239 239 daemonize=(
240 240 {'IPClusterEngines' : {'daemonize' : True}},
241 241 """run the cluster into the background (not available on Windows)""",
242 242 )
243 243 ))
244 244 class IPClusterEngines(BaseParallelApplication):
245 245
246 246 name = u'ipcluster'
247 247 description = engines_help
248 248 examples = _engines_examples
249 249 usage = None
250 250 config_file_name = Unicode(default_config_file_name)
251 251 default_log_level = logging.INFO
252 252 classes = List()
253 253 def _classes_default(self):
254 254 from IPython.parallel.apps import launcher
255 255 launchers = launcher.all_launchers
256 256 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
257 257 return [ProfileDir]+eslaunchers
258 258
259 259 n = Integer(num_cpus(), config=True,
260 260 help="""The number of engines to start. The default is to use one for each
261 261 CPU on your machine""")
262 262
263 263 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
264 264 def _engine_launcher_changed(self, name, old, new):
265 265 if isinstance(new, basestring):
266 266 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
267 267 " use engine_launcher_class" % self.__class__.__name__)
268 268 self.engine_launcher_class = new
269 269 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
270 270 config=True,
271 271 help="""The class for launching a set of Engines. Change this value
272 272 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
273 273 Each launcher class has its own set of configuration options, for making sure
274 274 it will work in your environment.
275 275
276 276 You can also write your own launcher, and specify it's absolute import path,
277 277 as in 'mymodule.launcher.FTLEnginesLauncher`.
278 278
279 279 IPython's bundled examples include:
280 280
281 281 Local : start engines locally as subprocesses [default]
282 282 MPI : use mpiexec to launch engines in an MPI environment
283 283 PBS : use PBS (qsub) to submit engines to a batch queue
284 284 SGE : use SGE (qsub) to submit engines to a batch queue
285 285 LSF : use LSF (bsub) to submit engines to a batch queue
286 286 SSH : use SSH to start the controller
287 287 Note that SSH does *not* move the connection files
288 288 around, so you will likely have to do this manually
289 289 unless the machines are on a shared file system.
290 Condor : use HTCondor to submit engines to a batch queue
290 291 WindowsHPC : use Windows HPC
291 292
292 293 If you are using one of IPython's builtin launchers, you can specify just the
293 294 prefix, e.g:
294 295
295 296 c.IPClusterEngines.engine_launcher_class = 'SSH'
296 297
297 298 or:
298 299
299 300 ipcluster start --engines=MPI
300 301
301 302 """
302 303 )
303 304 daemonize = Bool(False, config=True,
304 305 help="""Daemonize the ipcluster program. This implies --log-to-file.
305 306 Not available on Windows.
306 307 """)
307 308
308 309 def _daemonize_changed(self, name, old, new):
309 310 if new:
310 311 self.log_to_file = True
311 312
312 313 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
313 314 _stopping = False
314 315
315 316 aliases = Dict(engine_aliases)
316 317 flags = Dict(engine_flags)
317 318
318 319 @catch_config_error
319 320 def initialize(self, argv=None):
320 321 super(IPClusterEngines, self).initialize(argv)
321 322 self.init_signal()
322 323 self.init_launchers()
323 324
324 325 def init_launchers(self):
325 326 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
326 327
327 328 def init_signal(self):
328 329 # Setup signals
329 330 signal.signal(signal.SIGINT, self.sigint_handler)
330 331
331 332 def build_launcher(self, clsname, kind=None):
332 333 """import and instantiate a Launcher based on importstring"""
333 334 try:
334 335 klass = find_launcher_class(clsname, kind)
335 336 except (ImportError, KeyError):
336 337 self.log.fatal("Could not import launcher class: %r"%clsname)
337 338 self.exit(1)
338 339
339 340 launcher = klass(
340 341 work_dir=u'.', config=self.config, log=self.log,
341 342 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
342 343 )
343 344 return launcher
344 345
345 346 def engines_started_ok(self):
346 347 self.log.info("Engines appear to have started successfully")
347 348 self.early_shutdown = 0
348 349
349 350 def start_engines(self):
350 351 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
351 352 n = getattr(self.engine_launcher, 'engine_count', self.n)
352 353 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
353 354 self.engine_launcher.start(self.n)
354 355 self.engine_launcher.on_stop(self.engines_stopped_early)
355 356 if self.early_shutdown:
356 357 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
357 358
358 359 def engines_stopped_early(self, r):
359 360 if self.early_shutdown and not self._stopping:
360 361 self.log.error("""
361 362 Engines shutdown early, they probably failed to connect.
362 363
363 364 Check the engine log files for output.
364 365
365 366 If your controller and engines are not on the same machine, you probably
366 367 have to instruct the controller to listen on an interface other than localhost.
367 368
368 369 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
369 370
370 371 Be sure to read our security docs before instructing your controller to listen on
371 372 a public interface.
372 373 """)
373 374 self.stop_launchers()
374 375
375 376 return self.engines_stopped(r)
376 377
377 378 def engines_stopped(self, r):
378 379 return self.loop.stop()
379 380
380 381 def stop_engines(self):
381 382 if self.engine_launcher.running:
382 383 self.log.info("Stopping Engines...")
383 384 d = self.engine_launcher.stop()
384 385 return d
385 386 else:
386 387 return None
387 388
388 389 def stop_launchers(self, r=None):
389 390 if not self._stopping:
390 391 self._stopping = True
391 392 self.log.error("IPython cluster: stopping")
392 393 self.stop_engines()
393 394 # Wait a few seconds to let things shut down.
394 395 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
395 396 dc.start()
396 397
397 398 def sigint_handler(self, signum, frame):
398 399 self.log.debug("SIGINT received, stopping launchers...")
399 400 self.stop_launchers()
400 401
401 402 def start_logging(self):
402 403 # Remove old log files of the controller and engine
403 404 if self.clean_logs:
404 405 log_dir = self.profile_dir.log_dir
405 406 for f in os.listdir(log_dir):
406 407 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
407 408 os.remove(os.path.join(log_dir, f))
408 409 # This will remove old log files for ipcluster itself
409 410 # super(IPBaseParallelApplication, self).start_logging()
410 411
411 412 def start(self):
412 413 """Start the app for the engines subcommand."""
413 414 self.log.info("IPython cluster: started")
414 415 # First see if the cluster is already running
415 416
416 417 # Now log and daemonize
417 418 self.log.info(
418 419 'Starting engines with [daemon=%r]' % self.daemonize
419 420 )
420 421 # TODO: Get daemonize working on Windows or as a Windows Server.
421 422 if self.daemonize:
422 423 if os.name=='posix':
423 424 daemonize()
424 425
425 426 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
426 427 dc.start()
427 428 # Now write the new pid file AFTER our new forked pid is active.
428 429 # self.write_pid_file()
429 430 try:
430 431 self.loop.start()
431 432 except KeyboardInterrupt:
432 433 pass
433 434 except zmq.ZMQError as e:
434 435 if e.errno == errno.EINTR:
435 436 pass
436 437 else:
437 438 raise
438 439
439 440 start_aliases = {}
440 441 start_aliases.update(engine_aliases)
441 442 start_aliases.update(dict(
442 443 delay='IPClusterStart.delay',
443 444 controller = 'IPClusterStart.controller_launcher_class',
444 445 ))
445 446 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
446 447
447 448 class IPClusterStart(IPClusterEngines):
448 449
449 450 name = u'ipcluster'
450 451 description = start_help
451 452 examples = _start_examples
452 453 default_log_level = logging.INFO
453 454 auto_create = Bool(True, config=True,
454 455 help="whether to create the profile_dir if it doesn't exist")
455 456 classes = List()
456 457 def _classes_default(self,):
457 458 from IPython.parallel.apps import launcher
458 459 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
459 460
460 461 clean_logs = Bool(True, config=True,
461 462 help="whether to cleanup old logs before starting")
462 463
463 464 delay = CFloat(1., config=True,
464 465 help="delay (in s) between starting the controller and the engines")
465 466
466 467 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
467 468 def _controller_launcher_changed(self, name, old, new):
468 469 if isinstance(new, basestring):
469 470 # old 0.11-style config
470 471 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
471 472 " use controller_launcher_class" % self.__class__.__name__)
472 473 self.controller_launcher_class = new
473 474 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
474 475 config=True,
475 476 help="""The class for launching a Controller. Change this value if you want
476 477 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
477 478
478 479 Each launcher class has its own set of configuration options, for making sure
479 480 it will work in your environment.
480 481
481 482 Note that using a batch launcher for the controller *does not* put it
482 483 in the same batch job as the engines, so they will still start separately.
483 484
484 485 IPython's bundled examples include:
485 486
486 487 Local : start engines locally as subprocesses
487 488 MPI : use mpiexec to launch the controller in an MPI universe
488 489 PBS : use PBS (qsub) to submit the controller to a batch queue
489 490 SGE : use SGE (qsub) to submit the controller to a batch queue
490 491 LSF : use LSF (bsub) to submit the controller to a batch queue
491 492 Condor: use HTCondor to submit the controller to a batch queue
492 493 SSH : use SSH to start the controller
493 494 WindowsHPC : use Windows HPC
494 495
495 496 If you are using one of IPython's builtin launchers, you can specify just the
496 497 prefix, e.g:
497 498
498 499 c.IPClusterStart.controller_launcher_class = 'SSH'
499 500
500 501 or:
501 502
502 503 ipcluster start --controller=MPI
503 504
504 505 """
505 506 )
506 507 reset = Bool(False, config=True,
507 508 help="Whether to reset config files as part of '--create'."
508 509 )
509 510
510 511 # flags = Dict(flags)
511 512 aliases = Dict(start_aliases)
512 513
513 514 def init_launchers(self):
514 515 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
515 516 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
516 517
517 518 def engines_stopped(self, r):
518 519 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
519 520 pass
520 521
521 522 def start_controller(self):
522 523 self.log.info("Starting Controller with %s", self.controller_launcher_class)
523 524 self.controller_launcher.on_stop(self.stop_launchers)
524 525 self.controller_launcher.start()
525 526
526 527 def stop_controller(self):
527 528 # self.log.info("In stop_controller")
528 529 if self.controller_launcher and self.controller_launcher.running:
529 530 return self.controller_launcher.stop()
530 531
531 532 def stop_launchers(self, r=None):
532 533 if not self._stopping:
533 534 self.stop_controller()
534 535 super(IPClusterStart, self).stop_launchers()
535 536
536 537 def start(self):
537 538 """Start the app for the start subcommand."""
538 539 # First see if the cluster is already running
539 540 try:
540 541 pid = self.get_pid_from_file()
541 542 except PIDFileError:
542 543 pass
543 544 else:
544 545 if self.check_pid(pid):
545 546 self.log.critical(
546 547 'Cluster is already running with [pid=%s]. '
547 548 'use "ipcluster stop" to stop the cluster.' % pid
548 549 )
549 550 # Here I exit with a unusual exit status that other processes
550 551 # can watch for to learn how I existed.
551 552 self.exit(ALREADY_STARTED)
552 553 else:
553 554 self.remove_pid_file()
554 555
555 556
556 557 # Now log and daemonize
557 558 self.log.info(
558 559 'Starting ipcluster with [daemon=%r]' % self.daemonize
559 560 )
560 561 # TODO: Get daemonize working on Windows or as a Windows Server.
561 562 if self.daemonize:
562 563 if os.name=='posix':
563 564 daemonize()
564 565
565 566 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
566 567 dc.start()
567 568 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
568 569 dc.start()
569 570 # Now write the new pid file AFTER our new forked pid is active.
570 571 self.write_pid_file()
571 572 try:
572 573 self.loop.start()
573 574 except KeyboardInterrupt:
574 575 pass
575 576 except zmq.ZMQError as e:
576 577 if e.errno == errno.EINTR:
577 578 pass
578 579 else:
579 580 raise
580 581 finally:
581 582 self.remove_pid_file()
582 583
583 584 base='IPython.parallel.apps.ipclusterapp.IPCluster'
584 585
585 586 class IPClusterApp(BaseIPythonApplication):
586 587 name = u'ipcluster'
587 588 description = _description
588 589 examples = _main_examples
589 590
590 591 subcommands = {
591 592 'start' : (base+'Start', start_help),
592 593 'stop' : (base+'Stop', stop_help),
593 594 'engines' : (base+'Engines', engines_help),
594 595 }
595 596
596 597 # no aliases or flags for parent App
597 598 aliases = Dict()
598 599 flags = Dict()
599 600
600 601 def start(self):
601 602 if self.subapp is None:
602 603 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
603 604 print
604 605 self.print_description()
605 606 self.print_subcommands()
606 607 self.exit(1)
607 608 else:
608 609 return self.subapp.start()
609 610
610 611 def launch_new_instance():
611 612 """Create and run the IPython cluster."""
612 613 app = IPClusterApp.instance()
613 614 app.initialize()
614 615 app.start()
615 616
616 617
617 618 if __name__ == '__main__':
618 619 launch_new_instance()
619 620
General Comments 0
You need to be logged in to leave comments. Login now