##// END OF EJS Templates
fix typo in ipcluster help
MinRK -
Show More
@@ -1,529 +1,529 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import errno
25 25 import logging
26 26 import os
27 27 import re
28 28 import signal
29 29
30 30 from subprocess import check_call, CalledProcessError, PIPE
31 31 import zmq
32 32 from zmq.eventloop import ioloop
33 33
34 34 from IPython.config.application import Application, boolean_flag
35 35 from IPython.config.loader import Config
36 36 from IPython.core.application import BaseIPythonApplication
37 37 from IPython.core.profiledir import ProfileDir
38 38 from IPython.utils.daemonize import daemonize
39 39 from IPython.utils.importstring import import_item
40 40 from IPython.utils.sysinfo import num_cpus
41 41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
42 42 DottedObjectName)
43 43
44 44 from IPython.parallel.apps.baseapp import (
45 45 BaseParallelApplication,
46 46 PIDFileError,
47 47 base_flags, base_aliases
48 48 )
49 49
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Module level variables
53 53 #-----------------------------------------------------------------------------
54 54
55 55
56 56 default_config_file_name = u'ipcluster_config.py'
57 57
58 58
59 59 _description = """Start an IPython cluster for parallel computing.
60 60
61 61 An IPython cluster consists of 1 controller and 1 or more engines.
62 62 This command automates the startup of these processes using a wide
63 63 range of startup methods (SSH, local processes, PBS, mpiexec,
64 64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 you will typically do 'ipython create mycluster --parallel', then edit
66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 68 """
69 69
70 70 _main_examples = """
71 71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 72 ipcluster start -h # show the help string for the start subcmd
73 73
74 74 ipcluster stop -h # show the help string for the stop subcmd
75 75 ipcluster engines -h # show the help string for the engines subcmd
76 76 """
77 77
78 78 _start_examples = """
79 79 ipython profile create mycluster --parallel # create mycluster profile
80 80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 81 """
82 82
83 83 _stop_examples = """
84 84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 85 """
86 86
87 87 _engines_examples = """
88 88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 89 """
90 90
91 91
92 92 # Exit codes for ipcluster
93 93
94 94 # This will be the exit code if the ipcluster appears to be running because
95 95 # a .pid file exists
96 96 ALREADY_STARTED = 10
97 97
98 98
99 99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 100 # file to be found.
101 101 ALREADY_STOPPED = 11
102 102
103 103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 104 # file to be found.
105 105 NO_CLUSTER = 12
106 106
107 107
108 108 #-----------------------------------------------------------------------------
109 109 # Main application
110 110 #-----------------------------------------------------------------------------
111 111 start_help = """Start an IPython cluster for parallel computing
112 112
113 113 Start an ipython cluster by its profile name or cluster
114 114 directory. Cluster directories contain configuration, log and
115 115 security related files and are named using the convention
116 116 'profile_<name>' and should be creating using the 'start'
117 117 subcommand of 'ipcluster'. If your cluster directory is in
118 118 the cwd or the ipython directory, you can simply refer to it
119 119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
120 120 otherwise use the 'profile-dir' option.
121 121 """
122 122 stop_help = """Stop a running IPython cluster
123 123
124 124 Stop a running ipython cluster by its profile name or cluster
125 125 directory. Cluster directories are named using the convention
126 126 'profile_<name>'. If your cluster directory is in
127 127 the cwd or the ipython directory, you can simply refer to it
128 128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
129 129 use the '--profile-dir' option.
130 130 """
131 131 engines_help = """Start engines connected to an existing IPython cluster
132 132
133 133 Start one or more engines to connect to an existing Cluster
134 134 by profile name or cluster directory.
135 135 Cluster directories contain configuration, log and
136 136 security related files and are named using the convention
137 137 'profile_<name>' and should be creating using the 'start'
138 138 subcommand of 'ipcluster'. If your cluster directory is in
139 139 the cwd or the ipython directory, you can simply refer to it
140 140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
141 141 otherwise use the 'profile-dir' option.
142 142 """
143 143 stop_aliases = dict(
144 144 signal='IPClusterStop.signal',
145 145 )
146 146 stop_aliases.update(base_aliases)
147 147
148 148 class IPClusterStop(BaseParallelApplication):
149 149 name = u'ipcluster'
150 150 description = stop_help
151 151 examples = _stop_examples
152 152 config_file_name = Unicode(default_config_file_name)
153 153
154 154 signal = Int(signal.SIGINT, config=True,
155 155 help="signal to use for stopping processes.")
156 156
157 157 aliases = Dict(stop_aliases)
158 158
159 159 def start(self):
160 160 """Start the app for the stop subcommand."""
161 161 try:
162 162 pid = self.get_pid_from_file()
163 163 except PIDFileError:
164 164 self.log.critical(
165 165 'Could not read pid file, cluster is probably not running.'
166 166 )
167 167 # Here I exit with a unusual exit status that other processes
168 168 # can watch for to learn how I existed.
169 169 self.remove_pid_file()
170 170 self.exit(ALREADY_STOPPED)
171 171
172 172 if not self.check_pid(pid):
173 173 self.log.critical(
174 174 'Cluster [pid=%r] is not running.' % pid
175 175 )
176 176 self.remove_pid_file()
177 177 # Here I exit with a unusual exit status that other processes
178 178 # can watch for to learn how I existed.
179 179 self.exit(ALREADY_STOPPED)
180 180
181 181 elif os.name=='posix':
182 182 sig = self.signal
183 183 self.log.info(
184 184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
185 185 )
186 186 try:
187 187 os.kill(pid, sig)
188 188 except OSError:
189 189 self.log.error("Stopping cluster failed, assuming already dead.",
190 190 exc_info=True)
191 191 self.remove_pid_file()
192 192 elif os.name=='nt':
193 193 try:
194 194 # kill the whole tree
195 195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
196 196 except (CalledProcessError, OSError):
197 197 self.log.error("Stopping cluster failed, assuming already dead.",
198 198 exc_info=True)
199 199 self.remove_pid_file()
200 200
201 201 engine_aliases = {}
202 202 engine_aliases.update(base_aliases)
203 203 engine_aliases.update(dict(
204 204 n='IPClusterEngines.n',
205 205 engines = 'IPClusterEngines.engine_launcher_class',
206 206 daemonize = 'IPClusterEngines.daemonize',
207 207 ))
208 208 engine_flags = {}
209 209 engine_flags.update(base_flags)
210 210
211 211 engine_flags.update(dict(
212 212 daemonize=(
213 213 {'IPClusterEngines' : {'daemonize' : True}},
214 214 """run the cluster into the background (not available on Windows)""",
215 215 )
216 216 ))
217 217 class IPClusterEngines(BaseParallelApplication):
218 218
219 219 name = u'ipcluster'
220 220 description = engines_help
221 221 examples = _engines_examples
222 222 usage = None
223 223 config_file_name = Unicode(default_config_file_name)
224 224 default_log_level = logging.INFO
225 225 classes = List()
226 226 def _classes_default(self):
227 227 from IPython.parallel.apps import launcher
228 228 launchers = launcher.all_launchers
229 229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
230 230 return [ProfileDir]+eslaunchers
231 231
232 232 n = Int(num_cpus(), config=True,
233 233 help="""The number of engines to start. The default is to use one for each
234 234 CPU on your machine""")
235 235
236 236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
237 237 config=True,
238 238 help="""The class for launching a set of Engines. Change this value
239 239 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
240 240 Each launcher class has its own set of configuration options, for making sure
241 241 it will work in your environment.
242 242
243 243 You can also write your own launcher, and specify it's absolute import path,
244 244 as in 'mymodule.launcher.FTLEnginesLauncher`.
245 245
246 246 Examples include:
247 247
248 248 LocalEngineSetLauncher : start engines locally as subprocesses [default]
249 249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
250 250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
251 251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
252 252 SSHEngineSetLauncher : use SSH to start the controller
253 253 Note that SSH does *not* move the connection files
254 254 around, so you will likely have to do this manually
255 255 unless the machines are on a shared file system.
256 256 WindowsHPCEngineSetLauncher : use Windows HPC
257 257 """
258 258 )
259 259 daemonize = Bool(False, config=True,
260 260 help="""Daemonize the ipcluster program. This implies --log-to-file.
261 261 Not available on Windows.
262 262 """)
263 263
264 264 def _daemonize_changed(self, name, old, new):
265 265 if new:
266 266 self.log_to_file = True
267 267
268 268 aliases = Dict(engine_aliases)
269 269 flags = Dict(engine_flags)
270 270 _stopping = False
271 271
272 272 def initialize(self, argv=None):
273 273 super(IPClusterEngines, self).initialize(argv)
274 274 self.init_signal()
275 275 self.init_launchers()
276 276
277 277 def init_launchers(self):
278 278 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
279 279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
280 280
281 281 def init_signal(self):
282 282 # Setup signals
283 283 signal.signal(signal.SIGINT, self.sigint_handler)
284 284
285 285 def build_launcher(self, clsname):
286 286 """import and instantiate a Launcher based on importstring"""
287 287 if '.' not in clsname:
288 288 # not a module, presume it's the raw name in apps.launcher
289 289 clsname = 'IPython.parallel.apps.launcher.'+clsname
290 290 # print repr(clsname)
291 291 try:
292 292 klass = import_item(clsname)
293 293 except (ImportError, KeyError):
294 294 self.log.fatal("Could not import launcher class: %r"%clsname)
295 295 self.exit(1)
296 296
297 297 launcher = klass(
298 298 work_dir=u'.', config=self.config, log=self.log
299 299 )
300 300 return launcher
301 301
302 302 def start_engines(self):
303 303 self.log.info("Starting %i engines"%self.n)
304 304 self.engine_launcher.start(
305 305 self.n,
306 306 self.profile_dir.location
307 307 )
308 308
309 309 def stop_engines(self):
310 310 self.log.info("Stopping Engines...")
311 311 if self.engine_launcher.running:
312 312 d = self.engine_launcher.stop()
313 313 return d
314 314 else:
315 315 return None
316 316
317 317 def stop_launchers(self, r=None):
318 318 if not self._stopping:
319 319 self._stopping = True
320 320 self.log.error("IPython cluster: stopping")
321 321 self.stop_engines()
322 322 # Wait a few seconds to let things shut down.
323 323 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
324 324 dc.start()
325 325
326 326 def sigint_handler(self, signum, frame):
327 327 self.log.debug("SIGINT received, stopping launchers...")
328 328 self.stop_launchers()
329 329
330 330 def start_logging(self):
331 331 # Remove old log files of the controller and engine
332 332 if self.clean_logs:
333 333 log_dir = self.profile_dir.log_dir
334 334 for f in os.listdir(log_dir):
335 335 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
336 336 os.remove(os.path.join(log_dir, f))
337 337 # This will remove old log files for ipcluster itself
338 338 # super(IPBaseParallelApplication, self).start_logging()
339 339
340 340 def start(self):
341 341 """Start the app for the engines subcommand."""
342 342 self.log.info("IPython cluster: started")
343 343 # First see if the cluster is already running
344 344
345 345 # Now log and daemonize
346 346 self.log.info(
347 347 'Starting engines with [daemon=%r]' % self.daemonize
348 348 )
349 349 # TODO: Get daemonize working on Windows or as a Windows Server.
350 350 if self.daemonize:
351 351 if os.name=='posix':
352 352 daemonize()
353 353
354 354 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
355 355 dc.start()
356 356 # Now write the new pid file AFTER our new forked pid is active.
357 357 # self.write_pid_file()
358 358 try:
359 359 self.loop.start()
360 360 except KeyboardInterrupt:
361 361 pass
362 362 except zmq.ZMQError as e:
363 363 if e.errno == errno.EINTR:
364 364 pass
365 365 else:
366 366 raise
367 367
368 368 start_aliases = {}
369 369 start_aliases.update(engine_aliases)
370 370 start_aliases.update(dict(
371 371 delay='IPClusterStart.delay',
372 372 controller = 'IPClusterStart.controller_launcher_class',
373 373 ))
374 374 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
375 375
376 376 # set inherited Start keys directly, to ensure command-line args get higher priority
377 377 # than config file options.
378 378 for key,value in start_aliases.items():
379 379 if value.startswith('IPClusterEngines'):
380 380 start_aliases[key] = value.replace('IPClusterEngines', 'IPClusterStart')
381 381
382 382 class IPClusterStart(IPClusterEngines):
383 383
384 384 name = u'ipcluster'
385 385 description = start_help
386 386 examples = _start_examples
387 387 default_log_level = logging.INFO
388 388 auto_create = Bool(True, config=True,
389 389 help="whether to create the profile_dir if it doesn't exist")
390 390 classes = List()
391 391 def _classes_default(self,):
392 392 from IPython.parallel.apps import launcher
393 393 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
394 394
395 395 clean_logs = Bool(True, config=True,
396 396 help="whether to cleanup old logs before starting")
397 397
398 398 delay = CFloat(1., config=True,
399 399 help="delay (in s) between starting the controller and the engines")
400 400
401 401 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
402 402 config=True,
403 403 helep="""The class for launching a Controller. Change this value if you want
404 404 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
405 405
406 406 Each launcher class has its own set of configuration options, for making sure
407 407 it will work in your environment.
408 408
409 409 Examples include:
410 410
411 411 LocalControllerLauncher : start engines locally as subprocesses
412 412 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
413 413 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
414 414 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
415 415 SSHControllerLauncher : use SSH to start the controller
416 416 WindowsHPCControllerLauncher : use Windows HPC
417 417 """
418 418 )
419 419 reset = Bool(False, config=True,
420 420 help="Whether to reset config files as part of '--create'."
421 421 )
422 422
423 423 # flags = Dict(flags)
424 424 aliases = Dict(start_aliases)
425 425
426 426 def init_launchers(self):
427 427 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
428 428 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
429 429 self.controller_launcher.on_stop(self.stop_launchers)
430 430
431 431 def start_controller(self):
432 432 self.controller_launcher.start(
433 433 self.profile_dir.location
434 434 )
435 435
436 436 def stop_controller(self):
437 437 # self.log.info("In stop_controller")
438 438 if self.controller_launcher and self.controller_launcher.running:
439 439 return self.controller_launcher.stop()
440 440
441 441 def stop_launchers(self, r=None):
442 442 if not self._stopping:
443 443 self.stop_controller()
444 444 super(IPClusterStart, self).stop_launchers()
445 445
446 446 def start(self):
447 447 """Start the app for the start subcommand."""
448 448 # First see if the cluster is already running
449 449 try:
450 450 pid = self.get_pid_from_file()
451 451 except PIDFileError:
452 452 pass
453 453 else:
454 454 if self.check_pid(pid):
455 455 self.log.critical(
456 456 'Cluster is already running with [pid=%s]. '
457 457 'use "ipcluster stop" to stop the cluster.' % pid
458 458 )
459 459 # Here I exit with a unusual exit status that other processes
460 460 # can watch for to learn how I existed.
461 461 self.exit(ALREADY_STARTED)
462 462 else:
463 463 self.remove_pid_file()
464 464
465 465
466 466 # Now log and daemonize
467 467 self.log.info(
468 468 'Starting ipcluster with [daemon=%r]' % self.daemonize
469 469 )
470 470 # TODO: Get daemonize working on Windows or as a Windows Server.
471 471 if self.daemonize:
472 472 if os.name=='posix':
473 473 daemonize()
474 474
475 475 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
476 476 dc.start()
477 477 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
478 478 dc.start()
479 479 # Now write the new pid file AFTER our new forked pid is active.
480 480 self.write_pid_file()
481 481 try:
482 482 self.loop.start()
483 483 except KeyboardInterrupt:
484 484 pass
485 485 except zmq.ZMQError as e:
486 486 if e.errno == errno.EINTR:
487 487 pass
488 488 else:
489 489 raise
490 490 finally:
491 491 self.remove_pid_file()
492 492
493 493 base='IPython.parallel.apps.ipclusterapp.IPCluster'
494 494
495 495 class IPClusterApp(Application):
496 496 name = u'ipcluster'
497 497 description = _description
498 498 examples = _main_examples
499 499
500 500 subcommands = {
501 501 'start' : (base+'Start', start_help),
502 502 'stop' : (base+'Stop', stop_help),
503 503 'engines' : (base+'Engines', engines_help),
504 504 }
505 505
506 506 # no aliases or flags for parent App
507 507 aliases = Dict()
508 508 flags = Dict()
509 509
510 510 def start(self):
511 511 if self.subapp is None:
512 512 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
513 513 print
514 514 self.print_description()
515 515 self.print_subcommands()
516 516 self.exit(1)
517 517 else:
518 518 return self.subapp.start()
519 519
520 520 def launch_new_instance():
521 521 """Create and run the IPython cluster."""
522 522 app = IPClusterApp.instance()
523 523 app.initialize()
524 524 app.start()
525 525
526 526
527 527 if __name__ == '__main__':
528 528 launch_new_instance()
529 529
General Comments 0
You need to be logged in to leave comments. Login now