##// END OF EJS Templates
Merge pull request #3425 from minrk/clusterv...
Brian E. Granger -
r10958:0040c488 merge
parent child Browse files
Show More
@@ -1,618 +1,618
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag, catch_config_error
34 from IPython.config.application import Application, boolean_flag, catch_config_error
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.profiledir import ProfileDir
37 from IPython.core.profiledir import ProfileDir
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.sysinfo import num_cpus
40 from IPython.utils.sysinfo import num_cpus
41 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
41 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
42 DottedObjectName)
42 DottedObjectName)
43
43
44 from IPython.parallel.apps.baseapp import (
44 from IPython.parallel.apps.baseapp import (
45 BaseParallelApplication,
45 BaseParallelApplication,
46 PIDFileError,
46 PIDFileError,
47 base_flags, base_aliases
47 base_flags, base_aliases
48 )
48 )
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Module level variables
52 # Module level variables
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55
55
56 default_config_file_name = u'ipcluster_config.py'
56 default_config_file_name = u'ipcluster_config.py'
57
57
58
58
59 _description = """Start an IPython cluster for parallel computing.
59 _description = """Start an IPython cluster for parallel computing.
60
60
61 An IPython cluster consists of 1 controller and 1 or more engines.
61 An IPython cluster consists of 1 controller and 1 or more engines.
62 This command automates the startup of these processes using a wide
62 This command automates the startup of these processes using a wide
63 range of startup methods (SSH, local processes, PBS, mpiexec,
63 range of startup methods (SSH, local processes, PBS, mpiexec,
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 local host simply do 'ipcluster start --n=4'. For more complex usage
65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 you will typically do 'ipython profile create mycluster --parallel', then edit
66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 """
68 """
69
69
70 _main_examples = """
70 _main_examples = """
71 ipcluster start --n=4 # start a 4 node cluster on localhost
71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 ipcluster start -h # show the help string for the start subcmd
72 ipcluster start -h # show the help string for the start subcmd
73
73
74 ipcluster stop -h # show the help string for the stop subcmd
74 ipcluster stop -h # show the help string for the stop subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
76 """
76 """
77
77
78 _start_examples = """
78 _start_examples = """
79 ipython profile create mycluster --parallel # create mycluster profile
79 ipython profile create mycluster --parallel # create mycluster profile
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 """
81 """
82
82
83 _stop_examples = """
83 _stop_examples = """
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 """
85 """
86
86
87 _engines_examples = """
87 _engines_examples = """
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 """
89 """
90
90
91
91
92 # Exit codes for ipcluster
92 # Exit codes for ipcluster
93
93
94 # This will be the exit code if the ipcluster appears to be running because
94 # This will be the exit code if the ipcluster appears to be running because
95 # a .pid file exists
95 # a .pid file exists
96 ALREADY_STARTED = 10
96 ALREADY_STARTED = 10
97
97
98
98
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 # file to be found.
100 # file to be found.
101 ALREADY_STOPPED = 11
101 ALREADY_STOPPED = 11
102
102
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 # file to be found.
104 # file to be found.
105 NO_CLUSTER = 12
105 NO_CLUSTER = 12
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Utilities
109 # Utilities
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111
111
112 def find_launcher_class(clsname, kind):
112 def find_launcher_class(clsname, kind):
113 """Return a launcher for a given clsname and kind.
113 """Return a launcher for a given clsname and kind.
114
114
115 Parameters
115 Parameters
116 ==========
116 ==========
117 clsname : str
117 clsname : str
118 The full name of the launcher class, either with or without the
118 The full name of the launcher class, either with or without the
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF,
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF,
120 WindowsHPC).
120 WindowsHPC).
121 kind : str
121 kind : str
122 Either 'EngineSet' or 'Controller'.
122 Either 'EngineSet' or 'Controller'.
123 """
123 """
124 if '.' not in clsname:
124 if '.' not in clsname:
125 # not a module, presume it's the raw name in apps.launcher
125 # not a module, presume it's the raw name in apps.launcher
126 if kind and kind not in clsname:
126 if kind and kind not in clsname:
127 # doesn't match necessary full class name, assume it's
127 # doesn't match necessary full class name, assume it's
128 # just 'PBS' or 'MPI' prefix:
128 # just 'PBS' or 'MPI' prefix:
129 clsname = clsname + kind + 'Launcher'
129 clsname = clsname + kind + 'Launcher'
130 clsname = 'IPython.parallel.apps.launcher.'+clsname
130 clsname = 'IPython.parallel.apps.launcher.'+clsname
131 klass = import_item(clsname)
131 klass = import_item(clsname)
132 return klass
132 return klass
133
133
134 #-----------------------------------------------------------------------------
134 #-----------------------------------------------------------------------------
135 # Main application
135 # Main application
136 #-----------------------------------------------------------------------------
136 #-----------------------------------------------------------------------------
137
137
138 start_help = """Start an IPython cluster for parallel computing
138 start_help = """Start an IPython cluster for parallel computing
139
139
140 Start an ipython cluster by its profile name or cluster
140 Start an ipython cluster by its profile name or cluster
141 directory. Cluster directories contain configuration, log and
141 directory. Cluster directories contain configuration, log and
142 security related files and are named using the convention
142 security related files and are named using the convention
143 'profile_<name>' and should be creating using the 'start'
143 'profile_<name>' and should be creating using the 'start'
144 subcommand of 'ipcluster'. If your cluster directory is in
144 subcommand of 'ipcluster'. If your cluster directory is in
145 the cwd or the ipython directory, you can simply refer to it
145 the cwd or the ipython directory, you can simply refer to it
146 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
146 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
147 otherwise use the 'profile-dir' option.
147 otherwise use the 'profile-dir' option.
148 """
148 """
149 stop_help = """Stop a running IPython cluster
149 stop_help = """Stop a running IPython cluster
150
150
151 Stop a running ipython cluster by its profile name or cluster
151 Stop a running ipython cluster by its profile name or cluster
152 directory. Cluster directories are named using the convention
152 directory. Cluster directories are named using the convention
153 'profile_<name>'. If your cluster directory is in
153 'profile_<name>'. If your cluster directory is in
154 the cwd or the ipython directory, you can simply refer to it
154 the cwd or the ipython directory, you can simply refer to it
155 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
155 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
156 use the '--profile-dir' option.
156 use the '--profile-dir' option.
157 """
157 """
158 engines_help = """Start engines connected to an existing IPython cluster
158 engines_help = """Start engines connected to an existing IPython cluster
159
159
160 Start one or more engines to connect to an existing Cluster
160 Start one or more engines to connect to an existing Cluster
161 by profile name or cluster directory.
161 by profile name or cluster directory.
162 Cluster directories contain configuration, log and
162 Cluster directories contain configuration, log and
163 security related files and are named using the convention
163 security related files and are named using the convention
164 'profile_<name>' and should be creating using the 'start'
164 'profile_<name>' and should be creating using the 'start'
165 subcommand of 'ipcluster'. If your cluster directory is in
165 subcommand of 'ipcluster'. If your cluster directory is in
166 the cwd or the ipython directory, you can simply refer to it
166 the cwd or the ipython directory, you can simply refer to it
167 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
167 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
168 otherwise use the 'profile-dir' option.
168 otherwise use the 'profile-dir' option.
169 """
169 """
170 stop_aliases = dict(
170 stop_aliases = dict(
171 signal='IPClusterStop.signal',
171 signal='IPClusterStop.signal',
172 )
172 )
173 stop_aliases.update(base_aliases)
173 stop_aliases.update(base_aliases)
174
174
175 class IPClusterStop(BaseParallelApplication):
175 class IPClusterStop(BaseParallelApplication):
176 name = u'ipcluster'
176 name = u'ipcluster'
177 description = stop_help
177 description = stop_help
178 examples = _stop_examples
178 examples = _stop_examples
179 config_file_name = Unicode(default_config_file_name)
179 config_file_name = Unicode(default_config_file_name)
180
180
181 signal = Integer(signal.SIGINT, config=True,
181 signal = Integer(signal.SIGINT, config=True,
182 help="signal to use for stopping processes.")
182 help="signal to use for stopping processes.")
183
183
184 aliases = Dict(stop_aliases)
184 aliases = Dict(stop_aliases)
185
185
186 def start(self):
186 def start(self):
187 """Start the app for the stop subcommand."""
187 """Start the app for the stop subcommand."""
188 try:
188 try:
189 pid = self.get_pid_from_file()
189 pid = self.get_pid_from_file()
190 except PIDFileError:
190 except PIDFileError:
191 self.log.critical(
191 self.log.critical(
192 'Could not read pid file, cluster is probably not running.'
192 'Could not read pid file, cluster is probably not running.'
193 )
193 )
194 # Here I exit with a unusual exit status that other processes
194 # Here I exit with a unusual exit status that other processes
195 # can watch for to learn how I existed.
195 # can watch for to learn how I existed.
196 self.remove_pid_file()
196 self.remove_pid_file()
197 self.exit(ALREADY_STOPPED)
197 self.exit(ALREADY_STOPPED)
198
198
199 if not self.check_pid(pid):
199 if not self.check_pid(pid):
200 self.log.critical(
200 self.log.critical(
201 'Cluster [pid=%r] is not running.' % pid
201 'Cluster [pid=%r] is not running.' % pid
202 )
202 )
203 self.remove_pid_file()
203 self.remove_pid_file()
204 # Here I exit with a unusual exit status that other processes
204 # Here I exit with a unusual exit status that other processes
205 # can watch for to learn how I existed.
205 # can watch for to learn how I existed.
206 self.exit(ALREADY_STOPPED)
206 self.exit(ALREADY_STOPPED)
207
207
208 elif os.name=='posix':
208 elif os.name=='posix':
209 sig = self.signal
209 sig = self.signal
210 self.log.info(
210 self.log.info(
211 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
211 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
212 )
212 )
213 try:
213 try:
214 os.kill(pid, sig)
214 os.kill(pid, sig)
215 except OSError:
215 except OSError:
216 self.log.error("Stopping cluster failed, assuming already dead.",
216 self.log.error("Stopping cluster failed, assuming already dead.",
217 exc_info=True)
217 exc_info=True)
218 self.remove_pid_file()
218 self.remove_pid_file()
219 elif os.name=='nt':
219 elif os.name=='nt':
220 try:
220 try:
221 # kill the whole tree
221 # kill the whole tree
222 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
222 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
223 except (CalledProcessError, OSError):
223 except (CalledProcessError, OSError):
224 self.log.error("Stopping cluster failed, assuming already dead.",
224 self.log.error("Stopping cluster failed, assuming already dead.",
225 exc_info=True)
225 exc_info=True)
226 self.remove_pid_file()
226 self.remove_pid_file()
227
227
228 engine_aliases = {}
228 engine_aliases = {}
229 engine_aliases.update(base_aliases)
229 engine_aliases.update(base_aliases)
230 engine_aliases.update(dict(
230 engine_aliases.update(dict(
231 n='IPClusterEngines.n',
231 n='IPClusterEngines.n',
232 engines = 'IPClusterEngines.engine_launcher_class',
232 engines = 'IPClusterEngines.engine_launcher_class',
233 daemonize = 'IPClusterEngines.daemonize',
233 daemonize = 'IPClusterEngines.daemonize',
234 ))
234 ))
235 engine_flags = {}
235 engine_flags = {}
236 engine_flags.update(base_flags)
236 engine_flags.update(base_flags)
237
237
238 engine_flags.update(dict(
238 engine_flags.update(dict(
239 daemonize=(
239 daemonize=(
240 {'IPClusterEngines' : {'daemonize' : True}},
240 {'IPClusterEngines' : {'daemonize' : True}},
241 """run the cluster into the background (not available on Windows)""",
241 """run the cluster into the background (not available on Windows)""",
242 )
242 )
243 ))
243 ))
244 class IPClusterEngines(BaseParallelApplication):
244 class IPClusterEngines(BaseParallelApplication):
245
245
246 name = u'ipcluster'
246 name = u'ipcluster'
247 description = engines_help
247 description = engines_help
248 examples = _engines_examples
248 examples = _engines_examples
249 usage = None
249 usage = None
250 config_file_name = Unicode(default_config_file_name)
250 config_file_name = Unicode(default_config_file_name)
251 default_log_level = logging.INFO
251 default_log_level = logging.INFO
252 classes = List()
252 classes = List()
253 def _classes_default(self):
253 def _classes_default(self):
254 from IPython.parallel.apps import launcher
254 from IPython.parallel.apps import launcher
255 launchers = launcher.all_launchers
255 launchers = launcher.all_launchers
256 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
256 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
257 return [ProfileDir]+eslaunchers
257 return [ProfileDir]+eslaunchers
258
258
259 n = Integer(num_cpus(), config=True,
259 n = Integer(num_cpus(), config=True,
260 help="""The number of engines to start. The default is to use one for each
260 help="""The number of engines to start. The default is to use one for each
261 CPU on your machine""")
261 CPU on your machine""")
262
262
263 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
263 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
264 def _engine_launcher_changed(self, name, old, new):
264 def _engine_launcher_changed(self, name, old, new):
265 if isinstance(new, basestring):
265 if isinstance(new, basestring):
266 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
266 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
267 " use engine_launcher_class" % self.__class__.__name__)
267 " use engine_launcher_class" % self.__class__.__name__)
268 self.engine_launcher_class = new
268 self.engine_launcher_class = new
269 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
269 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
270 config=True,
270 config=True,
271 help="""The class for launching a set of Engines. Change this value
271 help="""The class for launching a set of Engines. Change this value
272 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
272 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
273 Each launcher class has its own set of configuration options, for making sure
273 Each launcher class has its own set of configuration options, for making sure
274 it will work in your environment.
274 it will work in your environment.
275
275
276 You can also write your own launcher, and specify it's absolute import path,
276 You can also write your own launcher, and specify it's absolute import path,
277 as in 'mymodule.launcher.FTLEnginesLauncher`.
277 as in 'mymodule.launcher.FTLEnginesLauncher`.
278
278
279 IPython's bundled examples include:
279 IPython's bundled examples include:
280
280
281 Local : start engines locally as subprocesses [default]
281 Local : start engines locally as subprocesses [default]
282 MPI : use mpiexec to launch engines in an MPI environment
282 MPI : use mpiexec to launch engines in an MPI environment
283 PBS : use PBS (qsub) to submit engines to a batch queue
283 PBS : use PBS (qsub) to submit engines to a batch queue
284 SGE : use SGE (qsub) to submit engines to a batch queue
284 SGE : use SGE (qsub) to submit engines to a batch queue
285 LSF : use LSF (bsub) to submit engines to a batch queue
285 LSF : use LSF (bsub) to submit engines to a batch queue
286 SSH : use SSH to start the controller
286 SSH : use SSH to start the controller
287 Note that SSH does *not* move the connection files
287 Note that SSH does *not* move the connection files
288 around, so you will likely have to do this manually
288 around, so you will likely have to do this manually
289 unless the machines are on a shared file system.
289 unless the machines are on a shared file system.
290 WindowsHPC : use Windows HPC
290 WindowsHPC : use Windows HPC
291
291
292 If you are using one of IPython's builtin launchers, you can specify just the
292 If you are using one of IPython's builtin launchers, you can specify just the
293 prefix, e.g:
293 prefix, e.g:
294
294
295 c.IPClusterEngines.engine_launcher_class = 'SSH'
295 c.IPClusterEngines.engine_launcher_class = 'SSH'
296
296
297 or:
297 or:
298
298
299 ipcluster start --engines=MPI
299 ipcluster start --engines=MPI
300
300
301 """
301 """
302 )
302 )
303 daemonize = Bool(False, config=True,
303 daemonize = Bool(False, config=True,
304 help="""Daemonize the ipcluster program. This implies --log-to-file.
304 help="""Daemonize the ipcluster program. This implies --log-to-file.
305 Not available on Windows.
305 Not available on Windows.
306 """)
306 """)
307
307
308 def _daemonize_changed(self, name, old, new):
308 def _daemonize_changed(self, name, old, new):
309 if new:
309 if new:
310 self.log_to_file = True
310 self.log_to_file = True
311
311
312 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
312 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
313 _stopping = False
313 _stopping = False
314
314
315 aliases = Dict(engine_aliases)
315 aliases = Dict(engine_aliases)
316 flags = Dict(engine_flags)
316 flags = Dict(engine_flags)
317
317
318 @catch_config_error
318 @catch_config_error
319 def initialize(self, argv=None):
319 def initialize(self, argv=None):
320 super(IPClusterEngines, self).initialize(argv)
320 super(IPClusterEngines, self).initialize(argv)
321 self.init_signal()
321 self.init_signal()
322 self.init_launchers()
322 self.init_launchers()
323
323
324 def init_launchers(self):
324 def init_launchers(self):
325 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
325 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
326
326
327 def init_signal(self):
327 def init_signal(self):
328 # Setup signals
328 # Setup signals
329 signal.signal(signal.SIGINT, self.sigint_handler)
329 signal.signal(signal.SIGINT, self.sigint_handler)
330
330
331 def build_launcher(self, clsname, kind=None):
331 def build_launcher(self, clsname, kind=None):
332 """import and instantiate a Launcher based on importstring"""
332 """import and instantiate a Launcher based on importstring"""
333 try:
333 try:
334 klass = find_launcher_class(clsname, kind)
334 klass = find_launcher_class(clsname, kind)
335 except (ImportError, KeyError):
335 except (ImportError, KeyError):
336 self.log.fatal("Could not import launcher class: %r"%clsname)
336 self.log.fatal("Could not import launcher class: %r"%clsname)
337 self.exit(1)
337 self.exit(1)
338
338
339 launcher = klass(
339 launcher = klass(
340 work_dir=u'.', config=self.config, log=self.log,
340 work_dir=u'.', config=self.config, log=self.log,
341 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
341 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
342 )
342 )
343 return launcher
343 return launcher
344
344
345 def engines_started_ok(self):
345 def engines_started_ok(self):
346 self.log.info("Engines appear to have started successfully")
346 self.log.info("Engines appear to have started successfully")
347 self.early_shutdown = 0
347 self.early_shutdown = 0
348
348
349 def start_engines(self):
349 def start_engines(self):
350 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
350 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
351 n = getattr(self.engine_launcher, 'engine_count', self.n)
351 n = getattr(self.engine_launcher, 'engine_count', self.n)
352 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
352 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
353 self.engine_launcher.start(self.n)
353 self.engine_launcher.start(self.n)
354 self.engine_launcher.on_stop(self.engines_stopped_early)
354 self.engine_launcher.on_stop(self.engines_stopped_early)
355 if self.early_shutdown:
355 if self.early_shutdown:
356 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
356 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
357
357
358 def engines_stopped_early(self, r):
358 def engines_stopped_early(self, r):
359 if self.early_shutdown and not self._stopping:
359 if self.early_shutdown and not self._stopping:
360 self.log.error("""
360 self.log.error("""
361 Engines shutdown early, they probably failed to connect.
361 Engines shutdown early, they probably failed to connect.
362
362
363 Check the engine log files for output.
363 Check the engine log files for output.
364
364
365 If your controller and engines are not on the same machine, you probably
365 If your controller and engines are not on the same machine, you probably
366 have to instruct the controller to listen on an interface other than localhost.
366 have to instruct the controller to listen on an interface other than localhost.
367
367
368 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
368 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
369
369
370 Be sure to read our security docs before instructing your controller to listen on
370 Be sure to read our security docs before instructing your controller to listen on
371 a public interface.
371 a public interface.
372 """)
372 """)
373 self.stop_launchers()
373 self.stop_launchers()
374
374
375 return self.engines_stopped(r)
375 return self.engines_stopped(r)
376
376
377 def engines_stopped(self, r):
377 def engines_stopped(self, r):
378 return self.loop.stop()
378 return self.loop.stop()
379
379
380 def stop_engines(self):
380 def stop_engines(self):
381 if self.engine_launcher.running:
381 if self.engine_launcher.running:
382 self.log.info("Stopping Engines...")
382 self.log.info("Stopping Engines...")
383 d = self.engine_launcher.stop()
383 d = self.engine_launcher.stop()
384 return d
384 return d
385 else:
385 else:
386 return None
386 return None
387
387
388 def stop_launchers(self, r=None):
388 def stop_launchers(self, r=None):
389 if not self._stopping:
389 if not self._stopping:
390 self._stopping = True
390 self._stopping = True
391 self.log.error("IPython cluster: stopping")
391 self.log.error("IPython cluster: stopping")
392 self.stop_engines()
392 self.stop_engines()
393 # Wait a few seconds to let things shut down.
393 # Wait a few seconds to let things shut down.
394 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
394 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
395 dc.start()
395 dc.start()
396
396
397 def sigint_handler(self, signum, frame):
397 def sigint_handler(self, signum, frame):
398 self.log.debug("SIGINT received, stopping launchers...")
398 self.log.debug("SIGINT received, stopping launchers...")
399 self.stop_launchers()
399 self.stop_launchers()
400
400
401 def start_logging(self):
401 def start_logging(self):
402 # Remove old log files of the controller and engine
402 # Remove old log files of the controller and engine
403 if self.clean_logs:
403 if self.clean_logs:
404 log_dir = self.profile_dir.log_dir
404 log_dir = self.profile_dir.log_dir
405 for f in os.listdir(log_dir):
405 for f in os.listdir(log_dir):
406 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
406 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
407 os.remove(os.path.join(log_dir, f))
407 os.remove(os.path.join(log_dir, f))
408 # This will remove old log files for ipcluster itself
408 # This will remove old log files for ipcluster itself
409 # super(IPBaseParallelApplication, self).start_logging()
409 # super(IPBaseParallelApplication, self).start_logging()
410
410
411 def start(self):
411 def start(self):
412 """Start the app for the engines subcommand."""
412 """Start the app for the engines subcommand."""
413 self.log.info("IPython cluster: started")
413 self.log.info("IPython cluster: started")
414 # First see if the cluster is already running
414 # First see if the cluster is already running
415
415
416 # Now log and daemonize
416 # Now log and daemonize
417 self.log.info(
417 self.log.info(
418 'Starting engines with [daemon=%r]' % self.daemonize
418 'Starting engines with [daemon=%r]' % self.daemonize
419 )
419 )
420 # TODO: Get daemonize working on Windows or as a Windows Server.
420 # TODO: Get daemonize working on Windows or as a Windows Server.
421 if self.daemonize:
421 if self.daemonize:
422 if os.name=='posix':
422 if os.name=='posix':
423 daemonize()
423 daemonize()
424
424
425 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
425 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
426 dc.start()
426 dc.start()
427 # Now write the new pid file AFTER our new forked pid is active.
427 # Now write the new pid file AFTER our new forked pid is active.
428 # self.write_pid_file()
428 # self.write_pid_file()
429 try:
429 try:
430 self.loop.start()
430 self.loop.start()
431 except KeyboardInterrupt:
431 except KeyboardInterrupt:
432 pass
432 pass
433 except zmq.ZMQError as e:
433 except zmq.ZMQError as e:
434 if e.errno == errno.EINTR:
434 if e.errno == errno.EINTR:
435 pass
435 pass
436 else:
436 else:
437 raise
437 raise
438
438
439 start_aliases = {}
439 start_aliases = {}
440 start_aliases.update(engine_aliases)
440 start_aliases.update(engine_aliases)
441 start_aliases.update(dict(
441 start_aliases.update(dict(
442 delay='IPClusterStart.delay',
442 delay='IPClusterStart.delay',
443 controller = 'IPClusterStart.controller_launcher_class',
443 controller = 'IPClusterStart.controller_launcher_class',
444 ))
444 ))
445 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
445 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
446
446
447 class IPClusterStart(IPClusterEngines):
447 class IPClusterStart(IPClusterEngines):
448
448
449 name = u'ipcluster'
449 name = u'ipcluster'
450 description = start_help
450 description = start_help
451 examples = _start_examples
451 examples = _start_examples
452 default_log_level = logging.INFO
452 default_log_level = logging.INFO
453 auto_create = Bool(True, config=True,
453 auto_create = Bool(True, config=True,
454 help="whether to create the profile_dir if it doesn't exist")
454 help="whether to create the profile_dir if it doesn't exist")
455 classes = List()
455 classes = List()
456 def _classes_default(self,):
456 def _classes_default(self,):
457 from IPython.parallel.apps import launcher
457 from IPython.parallel.apps import launcher
458 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
458 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
459
459
460 clean_logs = Bool(True, config=True,
460 clean_logs = Bool(True, config=True,
461 help="whether to cleanup old logs before starting")
461 help="whether to cleanup old logs before starting")
462
462
463 delay = CFloat(1., config=True,
463 delay = CFloat(1., config=True,
464 help="delay (in s) between starting the controller and the engines")
464 help="delay (in s) between starting the controller and the engines")
465
465
466 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
466 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
467 def _controller_launcher_changed(self, name, old, new):
467 def _controller_launcher_changed(self, name, old, new):
468 if isinstance(new, basestring):
468 if isinstance(new, basestring):
469 # old 0.11-style config
469 # old 0.11-style config
470 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
470 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
471 " use controller_launcher_class" % self.__class__.__name__)
471 " use controller_launcher_class" % self.__class__.__name__)
472 self.controller_launcher_class = new
472 self.controller_launcher_class = new
473 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
473 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
474 config=True,
474 config=True,
475 help="""The class for launching a Controller. Change this value if you want
475 help="""The class for launching a Controller. Change this value if you want
476 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
476 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
477
477
478 Each launcher class has its own set of configuration options, for making sure
478 Each launcher class has its own set of configuration options, for making sure
479 it will work in your environment.
479 it will work in your environment.
480
480
481 Note that using a batch launcher for the controller *does not* put it
481 Note that using a batch launcher for the controller *does not* put it
482 in the same batch job as the engines, so they will still start separately.
482 in the same batch job as the engines, so they will still start separately.
483
483
484 IPython's bundled examples include:
484 IPython's bundled examples include:
485
485
486 Local : start engines locally as subprocesses
486 Local : start engines locally as subprocesses
487 MPI : use mpiexec to launch the controller in an MPI universe
487 MPI : use mpiexec to launch the controller in an MPI universe
488 PBS : use PBS (qsub) to submit the controller to a batch queue
488 PBS : use PBS (qsub) to submit the controller to a batch queue
489 SGE : use SGE (qsub) to submit the controller to a batch queue
489 SGE : use SGE (qsub) to submit the controller to a batch queue
490 LSF : use LSF (bsub) to submit the controller to a batch queue
490 LSF : use LSF (bsub) to submit the controller to a batch queue
491 SSH : use SSH to start the controller
491 SSH : use SSH to start the controller
492 WindowsHPC : use Windows HPC
492 WindowsHPC : use Windows HPC
493
493
494 If you are using one of IPython's builtin launchers, you can specify just the
494 If you are using one of IPython's builtin launchers, you can specify just the
495 prefix, e.g:
495 prefix, e.g:
496
496
497 c.IPClusterStart.controller_launcher_class = 'SSH'
497 c.IPClusterStart.controller_launcher_class = 'SSH'
498
498
499 or:
499 or:
500
500
501 ipcluster start --controller=MPI
501 ipcluster start --controller=MPI
502
502
503 """
503 """
504 )
504 )
505 reset = Bool(False, config=True,
505 reset = Bool(False, config=True,
506 help="Whether to reset config files as part of '--create'."
506 help="Whether to reset config files as part of '--create'."
507 )
507 )
508
508
509 # flags = Dict(flags)
509 # flags = Dict(flags)
510 aliases = Dict(start_aliases)
510 aliases = Dict(start_aliases)
511
511
512 def init_launchers(self):
512 def init_launchers(self):
513 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
513 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
514 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
514 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
515
515
516 def engines_stopped(self, r):
516 def engines_stopped(self, r):
517 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
517 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
518 pass
518 pass
519
519
520 def start_controller(self):
520 def start_controller(self):
521 self.log.info("Starting Controller with %s", self.controller_launcher_class)
521 self.log.info("Starting Controller with %s", self.controller_launcher_class)
522 self.controller_launcher.on_stop(self.stop_launchers)
522 self.controller_launcher.on_stop(self.stop_launchers)
523 self.controller_launcher.start()
523 self.controller_launcher.start()
524
524
525 def stop_controller(self):
525 def stop_controller(self):
526 # self.log.info("In stop_controller")
526 # self.log.info("In stop_controller")
527 if self.controller_launcher and self.controller_launcher.running:
527 if self.controller_launcher and self.controller_launcher.running:
528 return self.controller_launcher.stop()
528 return self.controller_launcher.stop()
529
529
530 def stop_launchers(self, r=None):
530 def stop_launchers(self, r=None):
531 if not self._stopping:
531 if not self._stopping:
532 self.stop_controller()
532 self.stop_controller()
533 super(IPClusterStart, self).stop_launchers()
533 super(IPClusterStart, self).stop_launchers()
534
534
535 def start(self):
535 def start(self):
536 """Start the app for the start subcommand."""
536 """Start the app for the start subcommand."""
537 # First see if the cluster is already running
537 # First see if the cluster is already running
538 try:
538 try:
539 pid = self.get_pid_from_file()
539 pid = self.get_pid_from_file()
540 except PIDFileError:
540 except PIDFileError:
541 pass
541 pass
542 else:
542 else:
543 if self.check_pid(pid):
543 if self.check_pid(pid):
544 self.log.critical(
544 self.log.critical(
545 'Cluster is already running with [pid=%s]. '
545 'Cluster is already running with [pid=%s]. '
546 'use "ipcluster stop" to stop the cluster.' % pid
546 'use "ipcluster stop" to stop the cluster.' % pid
547 )
547 )
548 # Here I exit with a unusual exit status that other processes
548 # Here I exit with a unusual exit status that other processes
549 # can watch for to learn how I existed.
549 # can watch for to learn how I existed.
550 self.exit(ALREADY_STARTED)
550 self.exit(ALREADY_STARTED)
551 else:
551 else:
552 self.remove_pid_file()
552 self.remove_pid_file()
553
553
554
554
555 # Now log and daemonize
555 # Now log and daemonize
556 self.log.info(
556 self.log.info(
557 'Starting ipcluster with [daemon=%r]' % self.daemonize
557 'Starting ipcluster with [daemon=%r]' % self.daemonize
558 )
558 )
559 # TODO: Get daemonize working on Windows or as a Windows Server.
559 # TODO: Get daemonize working on Windows or as a Windows Server.
560 if self.daemonize:
560 if self.daemonize:
561 if os.name=='posix':
561 if os.name=='posix':
562 daemonize()
562 daemonize()
563
563
564 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
564 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
565 dc.start()
565 dc.start()
566 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
566 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
567 dc.start()
567 dc.start()
568 # Now write the new pid file AFTER our new forked pid is active.
568 # Now write the new pid file AFTER our new forked pid is active.
569 self.write_pid_file()
569 self.write_pid_file()
570 try:
570 try:
571 self.loop.start()
571 self.loop.start()
572 except KeyboardInterrupt:
572 except KeyboardInterrupt:
573 pass
573 pass
574 except zmq.ZMQError as e:
574 except zmq.ZMQError as e:
575 if e.errno == errno.EINTR:
575 if e.errno == errno.EINTR:
576 pass
576 pass
577 else:
577 else:
578 raise
578 raise
579 finally:
579 finally:
580 self.remove_pid_file()
580 self.remove_pid_file()
581
581
582 base='IPython.parallel.apps.ipclusterapp.IPCluster'
582 base='IPython.parallel.apps.ipclusterapp.IPCluster'
583
583
584 class IPClusterApp(Application):
584 class IPClusterApp(BaseIPythonApplication):
585 name = u'ipcluster'
585 name = u'ipcluster'
586 description = _description
586 description = _description
587 examples = _main_examples
587 examples = _main_examples
588
588
589 subcommands = {
589 subcommands = {
590 'start' : (base+'Start', start_help),
590 'start' : (base+'Start', start_help),
591 'stop' : (base+'Stop', stop_help),
591 'stop' : (base+'Stop', stop_help),
592 'engines' : (base+'Engines', engines_help),
592 'engines' : (base+'Engines', engines_help),
593 }
593 }
594
594
595 # no aliases or flags for parent App
595 # no aliases or flags for parent App
596 aliases = Dict()
596 aliases = Dict()
597 flags = Dict()
597 flags = Dict()
598
598
599 def start(self):
599 def start(self):
600 if self.subapp is None:
600 if self.subapp is None:
601 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
601 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
602 print
602 print
603 self.print_description()
603 self.print_description()
604 self.print_subcommands()
604 self.print_subcommands()
605 self.exit(1)
605 self.exit(1)
606 else:
606 else:
607 return self.subapp.start()
607 return self.subapp.start()
608
608
609 def launch_new_instance():
609 def launch_new_instance():
610 """Create and run the IPython cluster."""
610 """Create and run the IPython cluster."""
611 app = IPClusterApp.instance()
611 app = IPClusterApp.instance()
612 app.initialize()
612 app.initialize()
613 app.start()
613 app.start()
614
614
615
615
616 if __name__ == '__main__':
616 if __name__ == '__main__':
617 launch_new_instance()
617 launch_new_instance()
618
618
General Comments 0
You need to be logged in to leave comments. Login now