##// END OF EJS Templates
Provide logging messages in ipcluster log when engine or controllers fail to start (chapmanb/bcbio-nextgen#298
chapmanb -
Show More
@@ -1,610 +1,618 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12 from __future__ import print_function
12 from __future__ import print_function
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Copyright (C) 2008-2011 The IPython Development Team
15 # Copyright (C) 2008-2011 The IPython Development Team
16 #
16 #
17 # Distributed under the terms of the BSD License. The full license is in
17 # Distributed under the terms of the BSD License. The full license is in
18 # the file COPYING, distributed as part of this software.
18 # the file COPYING, distributed as part of this software.
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20
20
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22 # Imports
22 # Imports
23 #-----------------------------------------------------------------------------
23 #-----------------------------------------------------------------------------
24
24
25 import errno
25 import errno
26 import logging
26 import logging
27 import os
27 import os
28 import re
28 import re
29 import signal
29 import signal
30
30
31 from subprocess import check_call, CalledProcessError, PIPE
31 from subprocess import check_call, CalledProcessError, PIPE
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop
33 from zmq.eventloop import ioloop
34
34
35 from IPython.config.application import Application, boolean_flag, catch_config_error
35 from IPython.config.application import Application, boolean_flag, catch_config_error
36 from IPython.config.loader import Config
36 from IPython.config.loader import Config
37 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.application import BaseIPythonApplication
38 from IPython.core.profiledir import ProfileDir
38 from IPython.core.profiledir import ProfileDir
39 from IPython.utils.daemonize import daemonize
39 from IPython.utils.daemonize import daemonize
40 from IPython.utils.importstring import import_item
40 from IPython.utils.importstring import import_item
41 from IPython.utils.py3compat import string_types
41 from IPython.utils.py3compat import string_types
42 from IPython.utils.sysinfo import num_cpus
42 from IPython.utils.sysinfo import num_cpus
43 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
43 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
44 DottedObjectName)
44 DottedObjectName)
45
45
46 from IPython.parallel.apps.baseapp import (
46 from IPython.parallel.apps.baseapp import (
47 BaseParallelApplication,
47 BaseParallelApplication,
48 PIDFileError,
48 PIDFileError,
49 base_flags, base_aliases
49 base_flags, base_aliases
50 )
50 )
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Module level variables
54 # Module level variables
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57
57
58 _description = """Start an IPython cluster for parallel computing.
58 _description = """Start an IPython cluster for parallel computing.
59
59
60 An IPython cluster consists of 1 controller and 1 or more engines.
60 An IPython cluster consists of 1 controller and 1 or more engines.
61 This command automates the startup of these processes using a wide range of
61 This command automates the startup of these processes using a wide range of
62 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, HTCondor,
62 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, HTCondor,
63 Windows HPC Server 2008). To start a cluster with 4 engines on your
63 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 local host simply do 'ipcluster start --n=4'. For more complex usage
64 local host simply do 'ipcluster start --n=4'. For more complex usage
65 you will typically do 'ipython profile create mycluster --parallel', then edit
65 you will typically do 'ipython profile create mycluster --parallel', then edit
66 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
66 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 """
67 """
68
68
69 _main_examples = """
69 _main_examples = """
70 ipcluster start --n=4 # start a 4 node cluster on localhost
70 ipcluster start --n=4 # start a 4 node cluster on localhost
71 ipcluster start -h # show the help string for the start subcmd
71 ipcluster start -h # show the help string for the start subcmd
72
72
73 ipcluster stop -h # show the help string for the stop subcmd
73 ipcluster stop -h # show the help string for the stop subcmd
74 ipcluster engines -h # show the help string for the engines subcmd
74 ipcluster engines -h # show the help string for the engines subcmd
75 """
75 """
76
76
77 _start_examples = """
77 _start_examples = """
78 ipython profile create mycluster --parallel # create mycluster profile
78 ipython profile create mycluster --parallel # create mycluster profile
79 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
79 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 """
80 """
81
81
82 _stop_examples = """
82 _stop_examples = """
83 ipcluster stop --profile=mycluster # stop a running cluster by profile name
83 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 """
84 """
85
85
86 _engines_examples = """
86 _engines_examples = """
87 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
87 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 """
88 """
89
89
90
90
91 # Exit codes for ipcluster
91 # Exit codes for ipcluster
92
92
93 # This will be the exit code if the ipcluster appears to be running because
93 # This will be the exit code if the ipcluster appears to be running because
94 # a .pid file exists
94 # a .pid file exists
95 ALREADY_STARTED = 10
95 ALREADY_STARTED = 10
96
96
97
97
98 # This will be the exit code if ipcluster stop is run, but there is not .pid
98 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 # file to be found.
99 # file to be found.
100 ALREADY_STOPPED = 11
100 ALREADY_STOPPED = 11
101
101
102 # This will be the exit code if ipcluster engines is run, but there is not .pid
102 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 # file to be found.
103 # file to be found.
104 NO_CLUSTER = 12
104 NO_CLUSTER = 12
105
105
106
106
107 #-----------------------------------------------------------------------------
107 #-----------------------------------------------------------------------------
108 # Utilities
108 # Utilities
109 #-----------------------------------------------------------------------------
109 #-----------------------------------------------------------------------------
110
110
111 def find_launcher_class(clsname, kind):
111 def find_launcher_class(clsname, kind):
112 """Return a launcher for a given clsname and kind.
112 """Return a launcher for a given clsname and kind.
113
113
114 Parameters
114 Parameters
115 ==========
115 ==========
116 clsname : str
116 clsname : str
117 The full name of the launcher class, either with or without the
117 The full name of the launcher class, either with or without the
118 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor
118 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor
119 WindowsHPC).
119 WindowsHPC).
120 kind : str
120 kind : str
121 Either 'EngineSet' or 'Controller'.
121 Either 'EngineSet' or 'Controller'.
122 """
122 """
123 if '.' not in clsname:
123 if '.' not in clsname:
124 # not a module, presume it's the raw name in apps.launcher
124 # not a module, presume it's the raw name in apps.launcher
125 if kind and kind not in clsname:
125 if kind and kind not in clsname:
126 # doesn't match necessary full class name, assume it's
126 # doesn't match necessary full class name, assume it's
127 # just 'PBS' or 'MPI' etc prefix:
127 # just 'PBS' or 'MPI' etc prefix:
128 clsname = clsname + kind + 'Launcher'
128 clsname = clsname + kind + 'Launcher'
129 clsname = 'IPython.parallel.apps.launcher.'+clsname
129 clsname = 'IPython.parallel.apps.launcher.'+clsname
130 klass = import_item(clsname)
130 klass = import_item(clsname)
131 return klass
131 return klass
132
132
133 #-----------------------------------------------------------------------------
133 #-----------------------------------------------------------------------------
134 # Main application
134 # Main application
135 #-----------------------------------------------------------------------------
135 #-----------------------------------------------------------------------------
136
136
137 start_help = """Start an IPython cluster for parallel computing
137 start_help = """Start an IPython cluster for parallel computing
138
138
139 Start an ipython cluster by its profile name or cluster
139 Start an ipython cluster by its profile name or cluster
140 directory. Cluster directories contain configuration, log and
140 directory. Cluster directories contain configuration, log and
141 security related files and are named using the convention
141 security related files and are named using the convention
142 'profile_<name>' and should be creating using the 'start'
142 'profile_<name>' and should be creating using the 'start'
143 subcommand of 'ipcluster'. If your cluster directory is in
143 subcommand of 'ipcluster'. If your cluster directory is in
144 the cwd or the ipython directory, you can simply refer to it
144 the cwd or the ipython directory, you can simply refer to it
145 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
145 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
146 otherwise use the 'profile-dir' option.
146 otherwise use the 'profile-dir' option.
147 """
147 """
148 stop_help = """Stop a running IPython cluster
148 stop_help = """Stop a running IPython cluster
149
149
150 Stop a running ipython cluster by its profile name or cluster
150 Stop a running ipython cluster by its profile name or cluster
151 directory. Cluster directories are named using the convention
151 directory. Cluster directories are named using the convention
152 'profile_<name>'. If your cluster directory is in
152 'profile_<name>'. If your cluster directory is in
153 the cwd or the ipython directory, you can simply refer to it
153 the cwd or the ipython directory, you can simply refer to it
154 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
154 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
155 use the '--profile-dir' option.
155 use the '--profile-dir' option.
156 """
156 """
157 engines_help = """Start engines connected to an existing IPython cluster
157 engines_help = """Start engines connected to an existing IPython cluster
158
158
159 Start one or more engines to connect to an existing Cluster
159 Start one or more engines to connect to an existing Cluster
160 by profile name or cluster directory.
160 by profile name or cluster directory.
161 Cluster directories contain configuration, log and
161 Cluster directories contain configuration, log and
162 security related files and are named using the convention
162 security related files and are named using the convention
163 'profile_<name>' and should be creating using the 'start'
163 'profile_<name>' and should be creating using the 'start'
164 subcommand of 'ipcluster'. If your cluster directory is in
164 subcommand of 'ipcluster'. If your cluster directory is in
165 the cwd or the ipython directory, you can simply refer to it
165 the cwd or the ipython directory, you can simply refer to it
166 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
166 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
167 otherwise use the 'profile-dir' option.
167 otherwise use the 'profile-dir' option.
168 """
168 """
169 stop_aliases = dict(
169 stop_aliases = dict(
170 signal='IPClusterStop.signal',
170 signal='IPClusterStop.signal',
171 )
171 )
172 stop_aliases.update(base_aliases)
172 stop_aliases.update(base_aliases)
173
173
174 class IPClusterStop(BaseParallelApplication):
174 class IPClusterStop(BaseParallelApplication):
175 name = u'ipcluster'
175 name = u'ipcluster'
176 description = stop_help
176 description = stop_help
177 examples = _stop_examples
177 examples = _stop_examples
178
178
179 signal = Integer(signal.SIGINT, config=True,
179 signal = Integer(signal.SIGINT, config=True,
180 help="signal to use for stopping processes.")
180 help="signal to use for stopping processes.")
181
181
182 aliases = Dict(stop_aliases)
182 aliases = Dict(stop_aliases)
183
183
184 def start(self):
184 def start(self):
185 """Start the app for the stop subcommand."""
185 """Start the app for the stop subcommand."""
186 try:
186 try:
187 pid = self.get_pid_from_file()
187 pid = self.get_pid_from_file()
188 except PIDFileError:
188 except PIDFileError:
189 self.log.critical(
189 self.log.critical(
190 'Could not read pid file, cluster is probably not running.'
190 'Could not read pid file, cluster is probably not running.'
191 )
191 )
192 # Here I exit with a unusual exit status that other processes
192 # Here I exit with a unusual exit status that other processes
193 # can watch for to learn how I existed.
193 # can watch for to learn how I existed.
194 self.remove_pid_file()
194 self.remove_pid_file()
195 self.exit(ALREADY_STOPPED)
195 self.exit(ALREADY_STOPPED)
196
196
197 if not self.check_pid(pid):
197 if not self.check_pid(pid):
198 self.log.critical(
198 self.log.critical(
199 'Cluster [pid=%r] is not running.' % pid
199 'Cluster [pid=%r] is not running.' % pid
200 )
200 )
201 self.remove_pid_file()
201 self.remove_pid_file()
202 # Here I exit with a unusual exit status that other processes
202 # Here I exit with a unusual exit status that other processes
203 # can watch for to learn how I existed.
203 # can watch for to learn how I existed.
204 self.exit(ALREADY_STOPPED)
204 self.exit(ALREADY_STOPPED)
205
205
206 elif os.name=='posix':
206 elif os.name=='posix':
207 sig = self.signal
207 sig = self.signal
208 self.log.info(
208 self.log.info(
209 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
209 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
210 )
210 )
211 try:
211 try:
212 os.kill(pid, sig)
212 os.kill(pid, sig)
213 except OSError:
213 except OSError:
214 self.log.error("Stopping cluster failed, assuming already dead.",
214 self.log.error("Stopping cluster failed, assuming already dead.",
215 exc_info=True)
215 exc_info=True)
216 self.remove_pid_file()
216 self.remove_pid_file()
217 elif os.name=='nt':
217 elif os.name=='nt':
218 try:
218 try:
219 # kill the whole tree
219 # kill the whole tree
220 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
220 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
221 except (CalledProcessError, OSError):
221 except (CalledProcessError, OSError):
222 self.log.error("Stopping cluster failed, assuming already dead.",
222 self.log.error("Stopping cluster failed, assuming already dead.",
223 exc_info=True)
223 exc_info=True)
224 self.remove_pid_file()
224 self.remove_pid_file()
225
225
226 engine_aliases = {}
226 engine_aliases = {}
227 engine_aliases.update(base_aliases)
227 engine_aliases.update(base_aliases)
228 engine_aliases.update(dict(
228 engine_aliases.update(dict(
229 n='IPClusterEngines.n',
229 n='IPClusterEngines.n',
230 engines = 'IPClusterEngines.engine_launcher_class',
230 engines = 'IPClusterEngines.engine_launcher_class',
231 daemonize = 'IPClusterEngines.daemonize',
231 daemonize = 'IPClusterEngines.daemonize',
232 ))
232 ))
233 engine_flags = {}
233 engine_flags = {}
234 engine_flags.update(base_flags)
234 engine_flags.update(base_flags)
235
235
236 engine_flags.update(dict(
236 engine_flags.update(dict(
237 daemonize=(
237 daemonize=(
238 {'IPClusterEngines' : {'daemonize' : True}},
238 {'IPClusterEngines' : {'daemonize' : True}},
239 """run the cluster into the background (not available on Windows)""",
239 """run the cluster into the background (not available on Windows)""",
240 )
240 )
241 ))
241 ))
242 class IPClusterEngines(BaseParallelApplication):
242 class IPClusterEngines(BaseParallelApplication):
243
243
244 name = u'ipcluster'
244 name = u'ipcluster'
245 description = engines_help
245 description = engines_help
246 examples = _engines_examples
246 examples = _engines_examples
247 usage = None
247 usage = None
248 default_log_level = logging.INFO
248 default_log_level = logging.INFO
249 classes = List()
249 classes = List()
250 def _classes_default(self):
250 def _classes_default(self):
251 from IPython.parallel.apps import launcher
251 from IPython.parallel.apps import launcher
252 launchers = launcher.all_launchers
252 launchers = launcher.all_launchers
253 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
253 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
254 return [ProfileDir]+eslaunchers
254 return [ProfileDir]+eslaunchers
255
255
256 n = Integer(num_cpus(), config=True,
256 n = Integer(num_cpus(), config=True,
257 help="""The number of engines to start. The default is to use one for each
257 help="""The number of engines to start. The default is to use one for each
258 CPU on your machine""")
258 CPU on your machine""")
259
259
260 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
260 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
261 def _engine_launcher_changed(self, name, old, new):
261 def _engine_launcher_changed(self, name, old, new):
262 if isinstance(new, string_types):
262 if isinstance(new, string_types):
263 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
263 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
264 " use engine_launcher_class" % self.__class__.__name__)
264 " use engine_launcher_class" % self.__class__.__name__)
265 self.engine_launcher_class = new
265 self.engine_launcher_class = new
266 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
266 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
267 config=True,
267 config=True,
268 help="""The class for launching a set of Engines. Change this value
268 help="""The class for launching a set of Engines. Change this value
269 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
269 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
270 Each launcher class has its own set of configuration options, for making sure
270 Each launcher class has its own set of configuration options, for making sure
271 it will work in your environment.
271 it will work in your environment.
272
272
273 You can also write your own launcher, and specify it's absolute import path,
273 You can also write your own launcher, and specify it's absolute import path,
274 as in 'mymodule.launcher.FTLEnginesLauncher`.
274 as in 'mymodule.launcher.FTLEnginesLauncher`.
275
275
276 IPython's bundled examples include:
276 IPython's bundled examples include:
277
277
278 Local : start engines locally as subprocesses [default]
278 Local : start engines locally as subprocesses [default]
279 MPI : use mpiexec to launch engines in an MPI environment
279 MPI : use mpiexec to launch engines in an MPI environment
280 PBS : use PBS (qsub) to submit engines to a batch queue
280 PBS : use PBS (qsub) to submit engines to a batch queue
281 SGE : use SGE (qsub) to submit engines to a batch queue
281 SGE : use SGE (qsub) to submit engines to a batch queue
282 LSF : use LSF (bsub) to submit engines to a batch queue
282 LSF : use LSF (bsub) to submit engines to a batch queue
283 SSH : use SSH to start the controller
283 SSH : use SSH to start the controller
284 Note that SSH does *not* move the connection files
284 Note that SSH does *not* move the connection files
285 around, so you will likely have to do this manually
285 around, so you will likely have to do this manually
286 unless the machines are on a shared file system.
286 unless the machines are on a shared file system.
287 HTCondor : use HTCondor to submit engines to a batch queue
287 HTCondor : use HTCondor to submit engines to a batch queue
288 WindowsHPC : use Windows HPC
288 WindowsHPC : use Windows HPC
289
289
290 If you are using one of IPython's builtin launchers, you can specify just the
290 If you are using one of IPython's builtin launchers, you can specify just the
291 prefix, e.g:
291 prefix, e.g:
292
292
293 c.IPClusterEngines.engine_launcher_class = 'SSH'
293 c.IPClusterEngines.engine_launcher_class = 'SSH'
294
294
295 or:
295 or:
296
296
297 ipcluster start --engines=MPI
297 ipcluster start --engines=MPI
298
298
299 """
299 """
300 )
300 )
301 daemonize = Bool(False, config=True,
301 daemonize = Bool(False, config=True,
302 help="""Daemonize the ipcluster program. This implies --log-to-file.
302 help="""Daemonize the ipcluster program. This implies --log-to-file.
303 Not available on Windows.
303 Not available on Windows.
304 """)
304 """)
305
305
306 def _daemonize_changed(self, name, old, new):
306 def _daemonize_changed(self, name, old, new):
307 if new:
307 if new:
308 self.log_to_file = True
308 self.log_to_file = True
309
309
310 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
310 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
311 _stopping = False
311 _stopping = False
312
312
313 aliases = Dict(engine_aliases)
313 aliases = Dict(engine_aliases)
314 flags = Dict(engine_flags)
314 flags = Dict(engine_flags)
315
315
316 @catch_config_error
316 @catch_config_error
317 def initialize(self, argv=None):
317 def initialize(self, argv=None):
318 super(IPClusterEngines, self).initialize(argv)
318 super(IPClusterEngines, self).initialize(argv)
319 self.init_signal()
319 self.init_signal()
320 self.init_launchers()
320 self.init_launchers()
321
321
322 def init_launchers(self):
322 def init_launchers(self):
323 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
323 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
324
324
325 def init_signal(self):
325 def init_signal(self):
326 # Setup signals
326 # Setup signals
327 signal.signal(signal.SIGINT, self.sigint_handler)
327 signal.signal(signal.SIGINT, self.sigint_handler)
328
328
329 def build_launcher(self, clsname, kind=None):
329 def build_launcher(self, clsname, kind=None):
330 """import and instantiate a Launcher based on importstring"""
330 """import and instantiate a Launcher based on importstring"""
331 try:
331 try:
332 klass = find_launcher_class(clsname, kind)
332 klass = find_launcher_class(clsname, kind)
333 except (ImportError, KeyError):
333 except (ImportError, KeyError):
334 self.log.fatal("Could not import launcher class: %r"%clsname)
334 self.log.fatal("Could not import launcher class: %r"%clsname)
335 self.exit(1)
335 self.exit(1)
336
336
337 launcher = klass(
337 launcher = klass(
338 work_dir=u'.', parent=self, log=self.log,
338 work_dir=u'.', parent=self, log=self.log,
339 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
339 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
340 )
340 )
341 return launcher
341 return launcher
342
342
343 def engines_started_ok(self):
343 def engines_started_ok(self):
344 self.log.info("Engines appear to have started successfully")
344 self.log.info("Engines appear to have started successfully")
345 self.early_shutdown = 0
345 self.early_shutdown = 0
346
346
347 def start_engines(self):
347 def start_engines(self):
348 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
348 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
349 n = getattr(self.engine_launcher, 'engine_count', self.n)
349 n = getattr(self.engine_launcher, 'engine_count', self.n)
350 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
350 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
351 self.engine_launcher.start(self.n)
351 try:
352 self.engine_launcher.start(self.n)
353 except:
354 self.log.exception("Engine start failed")
355 raise
352 self.engine_launcher.on_stop(self.engines_stopped_early)
356 self.engine_launcher.on_stop(self.engines_stopped_early)
353 if self.early_shutdown:
357 if self.early_shutdown:
354 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
358 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
355
359
356 def engines_stopped_early(self, r):
360 def engines_stopped_early(self, r):
357 if self.early_shutdown and not self._stopping:
361 if self.early_shutdown and not self._stopping:
358 self.log.error("""
362 self.log.error("""
359 Engines shutdown early, they probably failed to connect.
363 Engines shutdown early, they probably failed to connect.
360
364
361 Check the engine log files for output.
365 Check the engine log files for output.
362
366
363 If your controller and engines are not on the same machine, you probably
367 If your controller and engines are not on the same machine, you probably
364 have to instruct the controller to listen on an interface other than localhost.
368 have to instruct the controller to listen on an interface other than localhost.
365
369
366 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
370 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
367
371
368 Be sure to read our security docs before instructing your controller to listen on
372 Be sure to read our security docs before instructing your controller to listen on
369 a public interface.
373 a public interface.
370 """)
374 """)
371 self.stop_launchers()
375 self.stop_launchers()
372
376
373 return self.engines_stopped(r)
377 return self.engines_stopped(r)
374
378
375 def engines_stopped(self, r):
379 def engines_stopped(self, r):
376 return self.loop.stop()
380 return self.loop.stop()
377
381
378 def stop_engines(self):
382 def stop_engines(self):
379 if self.engine_launcher.running:
383 if self.engine_launcher.running:
380 self.log.info("Stopping Engines...")
384 self.log.info("Stopping Engines...")
381 d = self.engine_launcher.stop()
385 d = self.engine_launcher.stop()
382 return d
386 return d
383 else:
387 else:
384 return None
388 return None
385
389
386 def stop_launchers(self, r=None):
390 def stop_launchers(self, r=None):
387 if not self._stopping:
391 if not self._stopping:
388 self._stopping = True
392 self._stopping = True
389 self.log.error("IPython cluster: stopping")
393 self.log.error("IPython cluster: stopping")
390 self.stop_engines()
394 self.stop_engines()
391 # Wait a few seconds to let things shut down.
395 # Wait a few seconds to let things shut down.
392 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
396 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
393 dc.start()
397 dc.start()
394
398
395 def sigint_handler(self, signum, frame):
399 def sigint_handler(self, signum, frame):
396 self.log.debug("SIGINT received, stopping launchers...")
400 self.log.debug("SIGINT received, stopping launchers...")
397 self.stop_launchers()
401 self.stop_launchers()
398
402
399 def start_logging(self):
403 def start_logging(self):
400 # Remove old log files of the controller and engine
404 # Remove old log files of the controller and engine
401 if self.clean_logs:
405 if self.clean_logs:
402 log_dir = self.profile_dir.log_dir
406 log_dir = self.profile_dir.log_dir
403 for f in os.listdir(log_dir):
407 for f in os.listdir(log_dir):
404 if re.match(r'ip(engine|controller)-.+\.(log|err|out)',f):
408 if re.match(r'ip(engine|controller)-.+\.(log|err|out)',f):
405 os.remove(os.path.join(log_dir, f))
409 os.remove(os.path.join(log_dir, f))
406
410
407 def start(self):
411 def start(self):
408 """Start the app for the engines subcommand."""
412 """Start the app for the engines subcommand."""
409 self.log.info("IPython cluster: started")
413 self.log.info("IPython cluster: started")
410 # First see if the cluster is already running
414 # First see if the cluster is already running
411
415
412 # Now log and daemonize
416 # Now log and daemonize
413 self.log.info(
417 self.log.info(
414 'Starting engines with [daemon=%r]' % self.daemonize
418 'Starting engines with [daemon=%r]' % self.daemonize
415 )
419 )
416 # TODO: Get daemonize working on Windows or as a Windows Server.
420 # TODO: Get daemonize working on Windows or as a Windows Server.
417 if self.daemonize:
421 if self.daemonize:
418 if os.name=='posix':
422 if os.name=='posix':
419 daemonize()
423 daemonize()
420
424
421 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
425 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
422 dc.start()
426 dc.start()
423 # Now write the new pid file AFTER our new forked pid is active.
427 # Now write the new pid file AFTER our new forked pid is active.
424 # self.write_pid_file()
428 # self.write_pid_file()
425 try:
429 try:
426 self.loop.start()
430 self.loop.start()
427 except KeyboardInterrupt:
431 except KeyboardInterrupt:
428 pass
432 pass
429 except zmq.ZMQError as e:
433 except zmq.ZMQError as e:
430 if e.errno == errno.EINTR:
434 if e.errno == errno.EINTR:
431 pass
435 pass
432 else:
436 else:
433 raise
437 raise
434
438
435 start_aliases = {}
439 start_aliases = {}
436 start_aliases.update(engine_aliases)
440 start_aliases.update(engine_aliases)
437 start_aliases.update(dict(
441 start_aliases.update(dict(
438 delay='IPClusterStart.delay',
442 delay='IPClusterStart.delay',
439 controller = 'IPClusterStart.controller_launcher_class',
443 controller = 'IPClusterStart.controller_launcher_class',
440 ))
444 ))
441 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
445 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
442
446
443 class IPClusterStart(IPClusterEngines):
447 class IPClusterStart(IPClusterEngines):
444
448
445 name = u'ipcluster'
449 name = u'ipcluster'
446 description = start_help
450 description = start_help
447 examples = _start_examples
451 examples = _start_examples
448 default_log_level = logging.INFO
452 default_log_level = logging.INFO
449 auto_create = Bool(True, config=True,
453 auto_create = Bool(True, config=True,
450 help="whether to create the profile_dir if it doesn't exist")
454 help="whether to create the profile_dir if it doesn't exist")
451 classes = List()
455 classes = List()
452 def _classes_default(self,):
456 def _classes_default(self,):
453 from IPython.parallel.apps import launcher
457 from IPython.parallel.apps import launcher
454 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
458 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
455
459
456 clean_logs = Bool(True, config=True,
460 clean_logs = Bool(True, config=True,
457 help="whether to cleanup old logs before starting")
461 help="whether to cleanup old logs before starting")
458
462
459 delay = CFloat(1., config=True,
463 delay = CFloat(1., config=True,
460 help="delay (in s) between starting the controller and the engines")
464 help="delay (in s) between starting the controller and the engines")
461
465
462 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
466 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
463 def _controller_launcher_changed(self, name, old, new):
467 def _controller_launcher_changed(self, name, old, new):
464 if isinstance(new, string_types):
468 if isinstance(new, string_types):
465 # old 0.11-style config
469 # old 0.11-style config
466 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
470 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
467 " use controller_launcher_class" % self.__class__.__name__)
471 " use controller_launcher_class" % self.__class__.__name__)
468 self.controller_launcher_class = new
472 self.controller_launcher_class = new
469 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
473 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
470 config=True,
474 config=True,
471 help="""The class for launching a Controller. Change this value if you want
475 help="""The class for launching a Controller. Change this value if you want
472 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
476 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
473
477
474 Each launcher class has its own set of configuration options, for making sure
478 Each launcher class has its own set of configuration options, for making sure
475 it will work in your environment.
479 it will work in your environment.
476
480
477 Note that using a batch launcher for the controller *does not* put it
481 Note that using a batch launcher for the controller *does not* put it
478 in the same batch job as the engines, so they will still start separately.
482 in the same batch job as the engines, so they will still start separately.
479
483
480 IPython's bundled examples include:
484 IPython's bundled examples include:
481
485
482 Local : start engines locally as subprocesses
486 Local : start engines locally as subprocesses
483 MPI : use mpiexec to launch the controller in an MPI universe
487 MPI : use mpiexec to launch the controller in an MPI universe
484 PBS : use PBS (qsub) to submit the controller to a batch queue
488 PBS : use PBS (qsub) to submit the controller to a batch queue
485 SGE : use SGE (qsub) to submit the controller to a batch queue
489 SGE : use SGE (qsub) to submit the controller to a batch queue
486 LSF : use LSF (bsub) to submit the controller to a batch queue
490 LSF : use LSF (bsub) to submit the controller to a batch queue
487 HTCondor : use HTCondor to submit the controller to a batch queue
491 HTCondor : use HTCondor to submit the controller to a batch queue
488 SSH : use SSH to start the controller
492 SSH : use SSH to start the controller
489 WindowsHPC : use Windows HPC
493 WindowsHPC : use Windows HPC
490
494
491 If you are using one of IPython's builtin launchers, you can specify just the
495 If you are using one of IPython's builtin launchers, you can specify just the
492 prefix, e.g:
496 prefix, e.g:
493
497
494 c.IPClusterStart.controller_launcher_class = 'SSH'
498 c.IPClusterStart.controller_launcher_class = 'SSH'
495
499
496 or:
500 or:
497
501
498 ipcluster start --controller=MPI
502 ipcluster start --controller=MPI
499
503
500 """
504 """
501 )
505 )
502 reset = Bool(False, config=True,
506 reset = Bool(False, config=True,
503 help="Whether to reset config files as part of '--create'."
507 help="Whether to reset config files as part of '--create'."
504 )
508 )
505
509
506 # flags = Dict(flags)
510 # flags = Dict(flags)
507 aliases = Dict(start_aliases)
511 aliases = Dict(start_aliases)
508
512
509 def init_launchers(self):
513 def init_launchers(self):
510 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
514 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
511 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
515 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
512
516
513 def engines_stopped(self, r):
517 def engines_stopped(self, r):
514 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
518 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
515 pass
519 pass
516
520
517 def start_controller(self):
521 def start_controller(self):
518 self.log.info("Starting Controller with %s", self.controller_launcher_class)
522 self.log.info("Starting Controller with %s", self.controller_launcher_class)
519 self.controller_launcher.on_stop(self.stop_launchers)
523 self.controller_launcher.on_stop(self.stop_launchers)
520 self.controller_launcher.start()
524 try:
525 self.controller_launcher.start()
526 except:
527 self.log.exception("Controller start failed")
528 raise
521
529
522 def stop_controller(self):
530 def stop_controller(self):
523 # self.log.info("In stop_controller")
531 # self.log.info("In stop_controller")
524 if self.controller_launcher and self.controller_launcher.running:
532 if self.controller_launcher and self.controller_launcher.running:
525 return self.controller_launcher.stop()
533 return self.controller_launcher.stop()
526
534
527 def stop_launchers(self, r=None):
535 def stop_launchers(self, r=None):
528 if not self._stopping:
536 if not self._stopping:
529 self.stop_controller()
537 self.stop_controller()
530 super(IPClusterStart, self).stop_launchers()
538 super(IPClusterStart, self).stop_launchers()
531
539
532 def start(self):
540 def start(self):
533 """Start the app for the start subcommand."""
541 """Start the app for the start subcommand."""
534 # First see if the cluster is already running
542 # First see if the cluster is already running
535 try:
543 try:
536 pid = self.get_pid_from_file()
544 pid = self.get_pid_from_file()
537 except PIDFileError:
545 except PIDFileError:
538 pass
546 pass
539 else:
547 else:
540 if self.check_pid(pid):
548 if self.check_pid(pid):
541 self.log.critical(
549 self.log.critical(
542 'Cluster is already running with [pid=%s]. '
550 'Cluster is already running with [pid=%s]. '
543 'use "ipcluster stop" to stop the cluster.' % pid
551 'use "ipcluster stop" to stop the cluster.' % pid
544 )
552 )
545 # Here I exit with a unusual exit status that other processes
553 # Here I exit with a unusual exit status that other processes
546 # can watch for to learn how I existed.
554 # can watch for to learn how I existed.
547 self.exit(ALREADY_STARTED)
555 self.exit(ALREADY_STARTED)
548 else:
556 else:
549 self.remove_pid_file()
557 self.remove_pid_file()
550
558
551
559
552 # Now log and daemonize
560 # Now log and daemonize
553 self.log.info(
561 self.log.info(
554 'Starting ipcluster with [daemon=%r]' % self.daemonize
562 'Starting ipcluster with [daemon=%r]' % self.daemonize
555 )
563 )
556 # TODO: Get daemonize working on Windows or as a Windows Server.
564 # TODO: Get daemonize working on Windows or as a Windows Server.
557 if self.daemonize:
565 if self.daemonize:
558 if os.name=='posix':
566 if os.name=='posix':
559 daemonize()
567 daemonize()
560
568
561 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
569 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
562 dc.start()
570 dc.start()
563 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
571 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
564 dc.start()
572 dc.start()
565 # Now write the new pid file AFTER our new forked pid is active.
573 # Now write the new pid file AFTER our new forked pid is active.
566 self.write_pid_file()
574 self.write_pid_file()
567 try:
575 try:
568 self.loop.start()
576 self.loop.start()
569 except KeyboardInterrupt:
577 except KeyboardInterrupt:
570 pass
578 pass
571 except zmq.ZMQError as e:
579 except zmq.ZMQError as e:
572 if e.errno == errno.EINTR:
580 if e.errno == errno.EINTR:
573 pass
581 pass
574 else:
582 else:
575 raise
583 raise
576 finally:
584 finally:
577 self.remove_pid_file()
585 self.remove_pid_file()
578
586
579 base='IPython.parallel.apps.ipclusterapp.IPCluster'
587 base='IPython.parallel.apps.ipclusterapp.IPCluster'
580
588
581 class IPClusterApp(BaseIPythonApplication):
589 class IPClusterApp(BaseIPythonApplication):
582 name = u'ipcluster'
590 name = u'ipcluster'
583 description = _description
591 description = _description
584 examples = _main_examples
592 examples = _main_examples
585
593
586 subcommands = {
594 subcommands = {
587 'start' : (base+'Start', start_help),
595 'start' : (base+'Start', start_help),
588 'stop' : (base+'Stop', stop_help),
596 'stop' : (base+'Stop', stop_help),
589 'engines' : (base+'Engines', engines_help),
597 'engines' : (base+'Engines', engines_help),
590 }
598 }
591
599
592 # no aliases or flags for parent App
600 # no aliases or flags for parent App
593 aliases = Dict()
601 aliases = Dict()
594 flags = Dict()
602 flags = Dict()
595
603
596 def start(self):
604 def start(self):
597 if self.subapp is None:
605 if self.subapp is None:
598 print("No subcommand specified. Must specify one of: %s"%(self.subcommands.keys()))
606 print("No subcommand specified. Must specify one of: %s"%(self.subcommands.keys()))
599 print()
607 print()
600 self.print_description()
608 self.print_description()
601 self.print_subcommands()
609 self.print_subcommands()
602 self.exit(1)
610 self.exit(1)
603 else:
611 else:
604 return self.subapp.start()
612 return self.subapp.start()
605
613
606 launch_new_instance = IPClusterApp.launch_instance
614 launch_new_instance = IPClusterApp.launch_instance
607
615
608 if __name__ == '__main__':
616 if __name__ == '__main__':
609 launch_new_instance()
617 launch_new_instance()
610
618
General Comments 0
You need to be logged in to leave comments. Login now