##// END OF EJS Templates
Merge pull request #3425 from minrk/clusterv...
Brian E. Granger -
r10958:0040c488 merge
parent child Browse files
Show More
@@ -1,618 +1,618
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import errno
25 25 import logging
26 26 import os
27 27 import re
28 28 import signal
29 29
30 30 from subprocess import check_call, CalledProcessError, PIPE
31 31 import zmq
32 32 from zmq.eventloop import ioloop
33 33
34 34 from IPython.config.application import Application, boolean_flag, catch_config_error
35 35 from IPython.config.loader import Config
36 36 from IPython.core.application import BaseIPythonApplication
37 37 from IPython.core.profiledir import ProfileDir
38 38 from IPython.utils.daemonize import daemonize
39 39 from IPython.utils.importstring import import_item
40 40 from IPython.utils.sysinfo import num_cpus
41 41 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
42 42 DottedObjectName)
43 43
44 44 from IPython.parallel.apps.baseapp import (
45 45 BaseParallelApplication,
46 46 PIDFileError,
47 47 base_flags, base_aliases
48 48 )
49 49
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Module level variables
53 53 #-----------------------------------------------------------------------------
54 54
55 55
56 56 default_config_file_name = u'ipcluster_config.py'
57 57
58 58
59 59 _description = """Start an IPython cluster for parallel computing.
60 60
61 61 An IPython cluster consists of 1 controller and 1 or more engines.
62 62 This command automates the startup of these processes using a wide
63 63 range of startup methods (SSH, local processes, PBS, mpiexec,
64 64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 68 """
69 69
70 70 _main_examples = """
71 71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 72 ipcluster start -h # show the help string for the start subcmd
73 73
74 74 ipcluster stop -h # show the help string for the stop subcmd
75 75 ipcluster engines -h # show the help string for the engines subcmd
76 76 """
77 77
78 78 _start_examples = """
79 79 ipython profile create mycluster --parallel # create mycluster profile
80 80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 81 """
82 82
83 83 _stop_examples = """
84 84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 85 """
86 86
87 87 _engines_examples = """
88 88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 89 """
90 90
91 91
92 92 # Exit codes for ipcluster
93 93
94 94 # This will be the exit code if the ipcluster appears to be running because
95 95 # a .pid file exists
96 96 ALREADY_STARTED = 10
97 97
98 98
99 99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 100 # file to be found.
101 101 ALREADY_STOPPED = 11
102 102
103 103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 104 # file to be found.
105 105 NO_CLUSTER = 12
106 106
107 107
108 108 #-----------------------------------------------------------------------------
109 109 # Utilities
110 110 #-----------------------------------------------------------------------------
111 111
112 112 def find_launcher_class(clsname, kind):
113 113 """Return a launcher for a given clsname and kind.
114 114
115 115 Parameters
116 116 ==========
117 117 clsname : str
118 118 The full name of the launcher class, either with or without the
119 119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF,
120 120 WindowsHPC).
121 121 kind : str
122 122 Either 'EngineSet' or 'Controller'.
123 123 """
124 124 if '.' not in clsname:
125 125 # not a module, presume it's the raw name in apps.launcher
126 126 if kind and kind not in clsname:
127 127 # doesn't match necessary full class name, assume it's
128 128 # just 'PBS' or 'MPI' prefix:
129 129 clsname = clsname + kind + 'Launcher'
130 130 clsname = 'IPython.parallel.apps.launcher.'+clsname
131 131 klass = import_item(clsname)
132 132 return klass
133 133
134 134 #-----------------------------------------------------------------------------
135 135 # Main application
136 136 #-----------------------------------------------------------------------------
137 137
138 138 start_help = """Start an IPython cluster for parallel computing
139 139
140 140 Start an ipython cluster by its profile name or cluster
141 141 directory. Cluster directories contain configuration, log and
142 142 security related files and are named using the convention
143 143 'profile_<name>' and should be creating using the 'start'
144 144 subcommand of 'ipcluster'. If your cluster directory is in
145 145 the cwd or the ipython directory, you can simply refer to it
146 146 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
147 147 otherwise use the 'profile-dir' option.
148 148 """
149 149 stop_help = """Stop a running IPython cluster
150 150
151 151 Stop a running ipython cluster by its profile name or cluster
152 152 directory. Cluster directories are named using the convention
153 153 'profile_<name>'. If your cluster directory is in
154 154 the cwd or the ipython directory, you can simply refer to it
155 155 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
156 156 use the '--profile-dir' option.
157 157 """
158 158 engines_help = """Start engines connected to an existing IPython cluster
159 159
160 160 Start one or more engines to connect to an existing Cluster
161 161 by profile name or cluster directory.
162 162 Cluster directories contain configuration, log and
163 163 security related files and are named using the convention
164 164 'profile_<name>' and should be creating using the 'start'
165 165 subcommand of 'ipcluster'. If your cluster directory is in
166 166 the cwd or the ipython directory, you can simply refer to it
167 167 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
168 168 otherwise use the 'profile-dir' option.
169 169 """
170 170 stop_aliases = dict(
171 171 signal='IPClusterStop.signal',
172 172 )
173 173 stop_aliases.update(base_aliases)
174 174
175 175 class IPClusterStop(BaseParallelApplication):
176 176 name = u'ipcluster'
177 177 description = stop_help
178 178 examples = _stop_examples
179 179 config_file_name = Unicode(default_config_file_name)
180 180
181 181 signal = Integer(signal.SIGINT, config=True,
182 182 help="signal to use for stopping processes.")
183 183
184 184 aliases = Dict(stop_aliases)
185 185
186 186 def start(self):
187 187 """Start the app for the stop subcommand."""
188 188 try:
189 189 pid = self.get_pid_from_file()
190 190 except PIDFileError:
191 191 self.log.critical(
192 192 'Could not read pid file, cluster is probably not running.'
193 193 )
194 194 # Here I exit with a unusual exit status that other processes
195 195 # can watch for to learn how I existed.
196 196 self.remove_pid_file()
197 197 self.exit(ALREADY_STOPPED)
198 198
199 199 if not self.check_pid(pid):
200 200 self.log.critical(
201 201 'Cluster [pid=%r] is not running.' % pid
202 202 )
203 203 self.remove_pid_file()
204 204 # Here I exit with a unusual exit status that other processes
205 205 # can watch for to learn how I existed.
206 206 self.exit(ALREADY_STOPPED)
207 207
208 208 elif os.name=='posix':
209 209 sig = self.signal
210 210 self.log.info(
211 211 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
212 212 )
213 213 try:
214 214 os.kill(pid, sig)
215 215 except OSError:
216 216 self.log.error("Stopping cluster failed, assuming already dead.",
217 217 exc_info=True)
218 218 self.remove_pid_file()
219 219 elif os.name=='nt':
220 220 try:
221 221 # kill the whole tree
222 222 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
223 223 except (CalledProcessError, OSError):
224 224 self.log.error("Stopping cluster failed, assuming already dead.",
225 225 exc_info=True)
226 226 self.remove_pid_file()
227 227
228 228 engine_aliases = {}
229 229 engine_aliases.update(base_aliases)
230 230 engine_aliases.update(dict(
231 231 n='IPClusterEngines.n',
232 232 engines = 'IPClusterEngines.engine_launcher_class',
233 233 daemonize = 'IPClusterEngines.daemonize',
234 234 ))
235 235 engine_flags = {}
236 236 engine_flags.update(base_flags)
237 237
238 238 engine_flags.update(dict(
239 239 daemonize=(
240 240 {'IPClusterEngines' : {'daemonize' : True}},
241 241 """run the cluster into the background (not available on Windows)""",
242 242 )
243 243 ))
244 244 class IPClusterEngines(BaseParallelApplication):
245 245
246 246 name = u'ipcluster'
247 247 description = engines_help
248 248 examples = _engines_examples
249 249 usage = None
250 250 config_file_name = Unicode(default_config_file_name)
251 251 default_log_level = logging.INFO
252 252 classes = List()
253 253 def _classes_default(self):
254 254 from IPython.parallel.apps import launcher
255 255 launchers = launcher.all_launchers
256 256 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
257 257 return [ProfileDir]+eslaunchers
258 258
259 259 n = Integer(num_cpus(), config=True,
260 260 help="""The number of engines to start. The default is to use one for each
261 261 CPU on your machine""")
262 262
263 263 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
264 264 def _engine_launcher_changed(self, name, old, new):
265 265 if isinstance(new, basestring):
266 266 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
267 267 " use engine_launcher_class" % self.__class__.__name__)
268 268 self.engine_launcher_class = new
269 269 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
270 270 config=True,
271 271 help="""The class for launching a set of Engines. Change this value
272 272 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
273 273 Each launcher class has its own set of configuration options, for making sure
274 274 it will work in your environment.
275 275
276 276 You can also write your own launcher, and specify it's absolute import path,
277 277 as in 'mymodule.launcher.FTLEnginesLauncher`.
278 278
279 279 IPython's bundled examples include:
280 280
281 281 Local : start engines locally as subprocesses [default]
282 282 MPI : use mpiexec to launch engines in an MPI environment
283 283 PBS : use PBS (qsub) to submit engines to a batch queue
284 284 SGE : use SGE (qsub) to submit engines to a batch queue
285 285 LSF : use LSF (bsub) to submit engines to a batch queue
286 286 SSH : use SSH to start the controller
287 287 Note that SSH does *not* move the connection files
288 288 around, so you will likely have to do this manually
289 289 unless the machines are on a shared file system.
290 290 WindowsHPC : use Windows HPC
291 291
292 292 If you are using one of IPython's builtin launchers, you can specify just the
293 293 prefix, e.g:
294 294
295 295 c.IPClusterEngines.engine_launcher_class = 'SSH'
296 296
297 297 or:
298 298
299 299 ipcluster start --engines=MPI
300 300
301 301 """
302 302 )
303 303 daemonize = Bool(False, config=True,
304 304 help="""Daemonize the ipcluster program. This implies --log-to-file.
305 305 Not available on Windows.
306 306 """)
307 307
308 308 def _daemonize_changed(self, name, old, new):
309 309 if new:
310 310 self.log_to_file = True
311 311
312 312 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
313 313 _stopping = False
314 314
315 315 aliases = Dict(engine_aliases)
316 316 flags = Dict(engine_flags)
317 317
318 318 @catch_config_error
319 319 def initialize(self, argv=None):
320 320 super(IPClusterEngines, self).initialize(argv)
321 321 self.init_signal()
322 322 self.init_launchers()
323 323
324 324 def init_launchers(self):
325 325 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
326 326
327 327 def init_signal(self):
328 328 # Setup signals
329 329 signal.signal(signal.SIGINT, self.sigint_handler)
330 330
331 331 def build_launcher(self, clsname, kind=None):
332 332 """import and instantiate a Launcher based on importstring"""
333 333 try:
334 334 klass = find_launcher_class(clsname, kind)
335 335 except (ImportError, KeyError):
336 336 self.log.fatal("Could not import launcher class: %r"%clsname)
337 337 self.exit(1)
338 338
339 339 launcher = klass(
340 340 work_dir=u'.', config=self.config, log=self.log,
341 341 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
342 342 )
343 343 return launcher
344 344
345 345 def engines_started_ok(self):
346 346 self.log.info("Engines appear to have started successfully")
347 347 self.early_shutdown = 0
348 348
349 349 def start_engines(self):
350 350 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
351 351 n = getattr(self.engine_launcher, 'engine_count', self.n)
352 352 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
353 353 self.engine_launcher.start(self.n)
354 354 self.engine_launcher.on_stop(self.engines_stopped_early)
355 355 if self.early_shutdown:
356 356 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
357 357
358 358 def engines_stopped_early(self, r):
359 359 if self.early_shutdown and not self._stopping:
360 360 self.log.error("""
361 361 Engines shutdown early, they probably failed to connect.
362 362
363 363 Check the engine log files for output.
364 364
365 365 If your controller and engines are not on the same machine, you probably
366 366 have to instruct the controller to listen on an interface other than localhost.
367 367
368 368 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
369 369
370 370 Be sure to read our security docs before instructing your controller to listen on
371 371 a public interface.
372 372 """)
373 373 self.stop_launchers()
374 374
375 375 return self.engines_stopped(r)
376 376
377 377 def engines_stopped(self, r):
378 378 return self.loop.stop()
379 379
380 380 def stop_engines(self):
381 381 if self.engine_launcher.running:
382 382 self.log.info("Stopping Engines...")
383 383 d = self.engine_launcher.stop()
384 384 return d
385 385 else:
386 386 return None
387 387
388 388 def stop_launchers(self, r=None):
389 389 if not self._stopping:
390 390 self._stopping = True
391 391 self.log.error("IPython cluster: stopping")
392 392 self.stop_engines()
393 393 # Wait a few seconds to let things shut down.
394 394 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
395 395 dc.start()
396 396
397 397 def sigint_handler(self, signum, frame):
398 398 self.log.debug("SIGINT received, stopping launchers...")
399 399 self.stop_launchers()
400 400
401 401 def start_logging(self):
402 402 # Remove old log files of the controller and engine
403 403 if self.clean_logs:
404 404 log_dir = self.profile_dir.log_dir
405 405 for f in os.listdir(log_dir):
406 406 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
407 407 os.remove(os.path.join(log_dir, f))
408 408 # This will remove old log files for ipcluster itself
409 409 # super(IPBaseParallelApplication, self).start_logging()
410 410
411 411 def start(self):
412 412 """Start the app for the engines subcommand."""
413 413 self.log.info("IPython cluster: started")
414 414 # First see if the cluster is already running
415 415
416 416 # Now log and daemonize
417 417 self.log.info(
418 418 'Starting engines with [daemon=%r]' % self.daemonize
419 419 )
420 420 # TODO: Get daemonize working on Windows or as a Windows Server.
421 421 if self.daemonize:
422 422 if os.name=='posix':
423 423 daemonize()
424 424
425 425 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
426 426 dc.start()
427 427 # Now write the new pid file AFTER our new forked pid is active.
428 428 # self.write_pid_file()
429 429 try:
430 430 self.loop.start()
431 431 except KeyboardInterrupt:
432 432 pass
433 433 except zmq.ZMQError as e:
434 434 if e.errno == errno.EINTR:
435 435 pass
436 436 else:
437 437 raise
438 438
439 439 start_aliases = {}
440 440 start_aliases.update(engine_aliases)
441 441 start_aliases.update(dict(
442 442 delay='IPClusterStart.delay',
443 443 controller = 'IPClusterStart.controller_launcher_class',
444 444 ))
445 445 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
446 446
447 447 class IPClusterStart(IPClusterEngines):
448 448
449 449 name = u'ipcluster'
450 450 description = start_help
451 451 examples = _start_examples
452 452 default_log_level = logging.INFO
453 453 auto_create = Bool(True, config=True,
454 454 help="whether to create the profile_dir if it doesn't exist")
455 455 classes = List()
456 456 def _classes_default(self,):
457 457 from IPython.parallel.apps import launcher
458 458 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
459 459
460 460 clean_logs = Bool(True, config=True,
461 461 help="whether to cleanup old logs before starting")
462 462
463 463 delay = CFloat(1., config=True,
464 464 help="delay (in s) between starting the controller and the engines")
465 465
466 466 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
467 467 def _controller_launcher_changed(self, name, old, new):
468 468 if isinstance(new, basestring):
469 469 # old 0.11-style config
470 470 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
471 471 " use controller_launcher_class" % self.__class__.__name__)
472 472 self.controller_launcher_class = new
473 473 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
474 474 config=True,
475 475 help="""The class for launching a Controller. Change this value if you want
476 476 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
477 477
478 478 Each launcher class has its own set of configuration options, for making sure
479 479 it will work in your environment.
480 480
481 481 Note that using a batch launcher for the controller *does not* put it
482 482 in the same batch job as the engines, so they will still start separately.
483 483
484 484 IPython's bundled examples include:
485 485
486 486 Local : start engines locally as subprocesses
487 487 MPI : use mpiexec to launch the controller in an MPI universe
488 488 PBS : use PBS (qsub) to submit the controller to a batch queue
489 489 SGE : use SGE (qsub) to submit the controller to a batch queue
490 490 LSF : use LSF (bsub) to submit the controller to a batch queue
491 491 SSH : use SSH to start the controller
492 492 WindowsHPC : use Windows HPC
493 493
494 494 If you are using one of IPython's builtin launchers, you can specify just the
495 495 prefix, e.g:
496 496
497 497 c.IPClusterStart.controller_launcher_class = 'SSH'
498 498
499 499 or:
500 500
501 501 ipcluster start --controller=MPI
502 502
503 503 """
504 504 )
505 505 reset = Bool(False, config=True,
506 506 help="Whether to reset config files as part of '--create'."
507 507 )
508 508
509 509 # flags = Dict(flags)
510 510 aliases = Dict(start_aliases)
511 511
512 512 def init_launchers(self):
513 513 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
514 514 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
515 515
516 516 def engines_stopped(self, r):
517 517 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
518 518 pass
519 519
520 520 def start_controller(self):
521 521 self.log.info("Starting Controller with %s", self.controller_launcher_class)
522 522 self.controller_launcher.on_stop(self.stop_launchers)
523 523 self.controller_launcher.start()
524 524
525 525 def stop_controller(self):
526 526 # self.log.info("In stop_controller")
527 527 if self.controller_launcher and self.controller_launcher.running:
528 528 return self.controller_launcher.stop()
529 529
530 530 def stop_launchers(self, r=None):
531 531 if not self._stopping:
532 532 self.stop_controller()
533 533 super(IPClusterStart, self).stop_launchers()
534 534
535 535 def start(self):
536 536 """Start the app for the start subcommand."""
537 537 # First see if the cluster is already running
538 538 try:
539 539 pid = self.get_pid_from_file()
540 540 except PIDFileError:
541 541 pass
542 542 else:
543 543 if self.check_pid(pid):
544 544 self.log.critical(
545 545 'Cluster is already running with [pid=%s]. '
546 546 'use "ipcluster stop" to stop the cluster.' % pid
547 547 )
548 548 # Here I exit with a unusual exit status that other processes
549 549 # can watch for to learn how I existed.
550 550 self.exit(ALREADY_STARTED)
551 551 else:
552 552 self.remove_pid_file()
553 553
554 554
555 555 # Now log and daemonize
556 556 self.log.info(
557 557 'Starting ipcluster with [daemon=%r]' % self.daemonize
558 558 )
559 559 # TODO: Get daemonize working on Windows or as a Windows Server.
560 560 if self.daemonize:
561 561 if os.name=='posix':
562 562 daemonize()
563 563
564 564 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
565 565 dc.start()
566 566 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
567 567 dc.start()
568 568 # Now write the new pid file AFTER our new forked pid is active.
569 569 self.write_pid_file()
570 570 try:
571 571 self.loop.start()
572 572 except KeyboardInterrupt:
573 573 pass
574 574 except zmq.ZMQError as e:
575 575 if e.errno == errno.EINTR:
576 576 pass
577 577 else:
578 578 raise
579 579 finally:
580 580 self.remove_pid_file()
581 581
582 582 base='IPython.parallel.apps.ipclusterapp.IPCluster'
583 583
584 class IPClusterApp(Application):
584 class IPClusterApp(BaseIPythonApplication):
585 585 name = u'ipcluster'
586 586 description = _description
587 587 examples = _main_examples
588 588
589 589 subcommands = {
590 590 'start' : (base+'Start', start_help),
591 591 'stop' : (base+'Stop', stop_help),
592 592 'engines' : (base+'Engines', engines_help),
593 593 }
594 594
595 595 # no aliases or flags for parent App
596 596 aliases = Dict()
597 597 flags = Dict()
598 598
599 599 def start(self):
600 600 if self.subapp is None:
601 601 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
602 602 print
603 603 self.print_description()
604 604 self.print_subcommands()
605 605 self.exit(1)
606 606 else:
607 607 return self.subapp.start()
608 608
609 609 def launch_new_instance():
610 610 """Create and run the IPython cluster."""
611 611 app = IPClusterApp.instance()
612 612 app.initialize()
613 613 app.start()
614 614
615 615
616 616 if __name__ == '__main__':
617 617 launch_new_instance()
618 618
General Comments 0
You need to be logged in to leave comments. Login now