##// END OF EJS Templates
Merge pull request #4820 from minrk/super-old-z...
Matthias Bussonnier -
r14684:03d9fde2 merge
parent child Browse files
Show More
@@ -1,612 +1,610 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12 from __future__ import print_function
12 from __future__ import print_function
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Copyright (C) 2008-2011 The IPython Development Team
15 # Copyright (C) 2008-2011 The IPython Development Team
16 #
16 #
17 # Distributed under the terms of the BSD License. The full license is in
17 # Distributed under the terms of the BSD License. The full license is in
18 # the file COPYING, distributed as part of this software.
18 # the file COPYING, distributed as part of this software.
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20
20
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22 # Imports
22 # Imports
23 #-----------------------------------------------------------------------------
23 #-----------------------------------------------------------------------------
24
24
25 import errno
25 import errno
26 import logging
26 import logging
27 import os
27 import os
28 import re
28 import re
29 import signal
29 import signal
30
30
31 from subprocess import check_call, CalledProcessError, PIPE
31 from subprocess import check_call, CalledProcessError, PIPE
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop
33 from zmq.eventloop import ioloop
34
34
35 from IPython.config.application import Application, boolean_flag, catch_config_error
35 from IPython.config.application import Application, boolean_flag, catch_config_error
36 from IPython.config.loader import Config
36 from IPython.config.loader import Config
37 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.application import BaseIPythonApplication
38 from IPython.core.profiledir import ProfileDir
38 from IPython.core.profiledir import ProfileDir
39 from IPython.utils.daemonize import daemonize
39 from IPython.utils.daemonize import daemonize
40 from IPython.utils.importstring import import_item
40 from IPython.utils.importstring import import_item
41 from IPython.utils.py3compat import string_types
41 from IPython.utils.py3compat import string_types
42 from IPython.utils.sysinfo import num_cpus
42 from IPython.utils.sysinfo import num_cpus
43 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
43 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
44 DottedObjectName)
44 DottedObjectName)
45
45
46 from IPython.parallel.apps.baseapp import (
46 from IPython.parallel.apps.baseapp import (
47 BaseParallelApplication,
47 BaseParallelApplication,
48 PIDFileError,
48 PIDFileError,
49 base_flags, base_aliases
49 base_flags, base_aliases
50 )
50 )
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Module level variables
54 # Module level variables
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57
57
58 _description = """Start an IPython cluster for parallel computing.
58 _description = """Start an IPython cluster for parallel computing.
59
59
60 An IPython cluster consists of 1 controller and 1 or more engines.
60 An IPython cluster consists of 1 controller and 1 or more engines.
61 This command automates the startup of these processes using a wide range of
61 This command automates the startup of these processes using a wide range of
62 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, HTCondor,
62 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, HTCondor,
63 Windows HPC Server 2008). To start a cluster with 4 engines on your
63 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 local host simply do 'ipcluster start --n=4'. For more complex usage
64 local host simply do 'ipcluster start --n=4'. For more complex usage
65 you will typically do 'ipython profile create mycluster --parallel', then edit
65 you will typically do 'ipython profile create mycluster --parallel', then edit
66 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
66 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 """
67 """
68
68
69 _main_examples = """
69 _main_examples = """
70 ipcluster start --n=4 # start a 4 node cluster on localhost
70 ipcluster start --n=4 # start a 4 node cluster on localhost
71 ipcluster start -h # show the help string for the start subcmd
71 ipcluster start -h # show the help string for the start subcmd
72
72
73 ipcluster stop -h # show the help string for the stop subcmd
73 ipcluster stop -h # show the help string for the stop subcmd
74 ipcluster engines -h # show the help string for the engines subcmd
74 ipcluster engines -h # show the help string for the engines subcmd
75 """
75 """
76
76
77 _start_examples = """
77 _start_examples = """
78 ipython profile create mycluster --parallel # create mycluster profile
78 ipython profile create mycluster --parallel # create mycluster profile
79 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
79 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 """
80 """
81
81
82 _stop_examples = """
82 _stop_examples = """
83 ipcluster stop --profile=mycluster # stop a running cluster by profile name
83 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 """
84 """
85
85
86 _engines_examples = """
86 _engines_examples = """
87 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
87 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 """
88 """
89
89
90
90
91 # Exit codes for ipcluster
91 # Exit codes for ipcluster
92
92
93 # This will be the exit code if the ipcluster appears to be running because
93 # This will be the exit code if the ipcluster appears to be running because
94 # a .pid file exists
94 # a .pid file exists
95 ALREADY_STARTED = 10
95 ALREADY_STARTED = 10
96
96
97
97
98 # This will be the exit code if ipcluster stop is run, but there is not .pid
98 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 # file to be found.
99 # file to be found.
100 ALREADY_STOPPED = 11
100 ALREADY_STOPPED = 11
101
101
102 # This will be the exit code if ipcluster engines is run, but there is not .pid
102 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 # file to be found.
103 # file to be found.
104 NO_CLUSTER = 12
104 NO_CLUSTER = 12
105
105
106
106
107 #-----------------------------------------------------------------------------
107 #-----------------------------------------------------------------------------
108 # Utilities
108 # Utilities
109 #-----------------------------------------------------------------------------
109 #-----------------------------------------------------------------------------
110
110
111 def find_launcher_class(clsname, kind):
111 def find_launcher_class(clsname, kind):
112 """Return a launcher for a given clsname and kind.
112 """Return a launcher for a given clsname and kind.
113
113
114 Parameters
114 Parameters
115 ==========
115 ==========
116 clsname : str
116 clsname : str
117 The full name of the launcher class, either with or without the
117 The full name of the launcher class, either with or without the
118 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor
118 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor
119 WindowsHPC).
119 WindowsHPC).
120 kind : str
120 kind : str
121 Either 'EngineSet' or 'Controller'.
121 Either 'EngineSet' or 'Controller'.
122 """
122 """
123 if '.' not in clsname:
123 if '.' not in clsname:
124 # not a module, presume it's the raw name in apps.launcher
124 # not a module, presume it's the raw name in apps.launcher
125 if kind and kind not in clsname:
125 if kind and kind not in clsname:
126 # doesn't match necessary full class name, assume it's
126 # doesn't match necessary full class name, assume it's
127 # just 'PBS' or 'MPI' etc prefix:
127 # just 'PBS' or 'MPI' etc prefix:
128 clsname = clsname + kind + 'Launcher'
128 clsname = clsname + kind + 'Launcher'
129 clsname = 'IPython.parallel.apps.launcher.'+clsname
129 clsname = 'IPython.parallel.apps.launcher.'+clsname
130 klass = import_item(clsname)
130 klass = import_item(clsname)
131 return klass
131 return klass
132
132
133 #-----------------------------------------------------------------------------
133 #-----------------------------------------------------------------------------
134 # Main application
134 # Main application
135 #-----------------------------------------------------------------------------
135 #-----------------------------------------------------------------------------
136
136
137 start_help = """Start an IPython cluster for parallel computing
137 start_help = """Start an IPython cluster for parallel computing
138
138
139 Start an ipython cluster by its profile name or cluster
139 Start an ipython cluster by its profile name or cluster
140 directory. Cluster directories contain configuration, log and
140 directory. Cluster directories contain configuration, log and
141 security related files and are named using the convention
141 security related files and are named using the convention
142 'profile_<name>' and should be creating using the 'start'
142 'profile_<name>' and should be creating using the 'start'
143 subcommand of 'ipcluster'. If your cluster directory is in
143 subcommand of 'ipcluster'. If your cluster directory is in
144 the cwd or the ipython directory, you can simply refer to it
144 the cwd or the ipython directory, you can simply refer to it
145 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
145 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
146 otherwise use the 'profile-dir' option.
146 otherwise use the 'profile-dir' option.
147 """
147 """
148 stop_help = """Stop a running IPython cluster
148 stop_help = """Stop a running IPython cluster
149
149
150 Stop a running ipython cluster by its profile name or cluster
150 Stop a running ipython cluster by its profile name or cluster
151 directory. Cluster directories are named using the convention
151 directory. Cluster directories are named using the convention
152 'profile_<name>'. If your cluster directory is in
152 'profile_<name>'. If your cluster directory is in
153 the cwd or the ipython directory, you can simply refer to it
153 the cwd or the ipython directory, you can simply refer to it
154 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
154 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
155 use the '--profile-dir' option.
155 use the '--profile-dir' option.
156 """
156 """
157 engines_help = """Start engines connected to an existing IPython cluster
157 engines_help = """Start engines connected to an existing IPython cluster
158
158
159 Start one or more engines to connect to an existing Cluster
159 Start one or more engines to connect to an existing Cluster
160 by profile name or cluster directory.
160 by profile name or cluster directory.
161 Cluster directories contain configuration, log and
161 Cluster directories contain configuration, log and
162 security related files and are named using the convention
162 security related files and are named using the convention
163 'profile_<name>' and should be creating using the 'start'
163 'profile_<name>' and should be creating using the 'start'
164 subcommand of 'ipcluster'. If your cluster directory is in
164 subcommand of 'ipcluster'. If your cluster directory is in
165 the cwd or the ipython directory, you can simply refer to it
165 the cwd or the ipython directory, you can simply refer to it
166 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
166 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
167 otherwise use the 'profile-dir' option.
167 otherwise use the 'profile-dir' option.
168 """
168 """
169 stop_aliases = dict(
169 stop_aliases = dict(
170 signal='IPClusterStop.signal',
170 signal='IPClusterStop.signal',
171 )
171 )
172 stop_aliases.update(base_aliases)
172 stop_aliases.update(base_aliases)
173
173
174 class IPClusterStop(BaseParallelApplication):
174 class IPClusterStop(BaseParallelApplication):
175 name = u'ipcluster'
175 name = u'ipcluster'
176 description = stop_help
176 description = stop_help
177 examples = _stop_examples
177 examples = _stop_examples
178
178
179 signal = Integer(signal.SIGINT, config=True,
179 signal = Integer(signal.SIGINT, config=True,
180 help="signal to use for stopping processes.")
180 help="signal to use for stopping processes.")
181
181
182 aliases = Dict(stop_aliases)
182 aliases = Dict(stop_aliases)
183
183
184 def start(self):
184 def start(self):
185 """Start the app for the stop subcommand."""
185 """Start the app for the stop subcommand."""
186 try:
186 try:
187 pid = self.get_pid_from_file()
187 pid = self.get_pid_from_file()
188 except PIDFileError:
188 except PIDFileError:
189 self.log.critical(
189 self.log.critical(
190 'Could not read pid file, cluster is probably not running.'
190 'Could not read pid file, cluster is probably not running.'
191 )
191 )
192 # Here I exit with a unusual exit status that other processes
192 # Here I exit with a unusual exit status that other processes
193 # can watch for to learn how I existed.
193 # can watch for to learn how I existed.
194 self.remove_pid_file()
194 self.remove_pid_file()
195 self.exit(ALREADY_STOPPED)
195 self.exit(ALREADY_STOPPED)
196
196
197 if not self.check_pid(pid):
197 if not self.check_pid(pid):
198 self.log.critical(
198 self.log.critical(
199 'Cluster [pid=%r] is not running.' % pid
199 'Cluster [pid=%r] is not running.' % pid
200 )
200 )
201 self.remove_pid_file()
201 self.remove_pid_file()
202 # Here I exit with a unusual exit status that other processes
202 # Here I exit with a unusual exit status that other processes
203 # can watch for to learn how I existed.
203 # can watch for to learn how I existed.
204 self.exit(ALREADY_STOPPED)
204 self.exit(ALREADY_STOPPED)
205
205
206 elif os.name=='posix':
206 elif os.name=='posix':
207 sig = self.signal
207 sig = self.signal
208 self.log.info(
208 self.log.info(
209 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
209 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
210 )
210 )
211 try:
211 try:
212 os.kill(pid, sig)
212 os.kill(pid, sig)
213 except OSError:
213 except OSError:
214 self.log.error("Stopping cluster failed, assuming already dead.",
214 self.log.error("Stopping cluster failed, assuming already dead.",
215 exc_info=True)
215 exc_info=True)
216 self.remove_pid_file()
216 self.remove_pid_file()
217 elif os.name=='nt':
217 elif os.name=='nt':
218 try:
218 try:
219 # kill the whole tree
219 # kill the whole tree
220 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
220 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
221 except (CalledProcessError, OSError):
221 except (CalledProcessError, OSError):
222 self.log.error("Stopping cluster failed, assuming already dead.",
222 self.log.error("Stopping cluster failed, assuming already dead.",
223 exc_info=True)
223 exc_info=True)
224 self.remove_pid_file()
224 self.remove_pid_file()
225
225
226 engine_aliases = {}
226 engine_aliases = {}
227 engine_aliases.update(base_aliases)
227 engine_aliases.update(base_aliases)
228 engine_aliases.update(dict(
228 engine_aliases.update(dict(
229 n='IPClusterEngines.n',
229 n='IPClusterEngines.n',
230 engines = 'IPClusterEngines.engine_launcher_class',
230 engines = 'IPClusterEngines.engine_launcher_class',
231 daemonize = 'IPClusterEngines.daemonize',
231 daemonize = 'IPClusterEngines.daemonize',
232 ))
232 ))
233 engine_flags = {}
233 engine_flags = {}
234 engine_flags.update(base_flags)
234 engine_flags.update(base_flags)
235
235
236 engine_flags.update(dict(
236 engine_flags.update(dict(
237 daemonize=(
237 daemonize=(
238 {'IPClusterEngines' : {'daemonize' : True}},
238 {'IPClusterEngines' : {'daemonize' : True}},
239 """run the cluster into the background (not available on Windows)""",
239 """run the cluster into the background (not available on Windows)""",
240 )
240 )
241 ))
241 ))
242 class IPClusterEngines(BaseParallelApplication):
242 class IPClusterEngines(BaseParallelApplication):
243
243
244 name = u'ipcluster'
244 name = u'ipcluster'
245 description = engines_help
245 description = engines_help
246 examples = _engines_examples
246 examples = _engines_examples
247 usage = None
247 usage = None
248 default_log_level = logging.INFO
248 default_log_level = logging.INFO
249 classes = List()
249 classes = List()
250 def _classes_default(self):
250 def _classes_default(self):
251 from IPython.parallel.apps import launcher
251 from IPython.parallel.apps import launcher
252 launchers = launcher.all_launchers
252 launchers = launcher.all_launchers
253 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
253 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
254 return [ProfileDir]+eslaunchers
254 return [ProfileDir]+eslaunchers
255
255
256 n = Integer(num_cpus(), config=True,
256 n = Integer(num_cpus(), config=True,
257 help="""The number of engines to start. The default is to use one for each
257 help="""The number of engines to start. The default is to use one for each
258 CPU on your machine""")
258 CPU on your machine""")
259
259
260 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
260 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
261 def _engine_launcher_changed(self, name, old, new):
261 def _engine_launcher_changed(self, name, old, new):
262 if isinstance(new, string_types):
262 if isinstance(new, string_types):
263 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
263 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
264 " use engine_launcher_class" % self.__class__.__name__)
264 " use engine_launcher_class" % self.__class__.__name__)
265 self.engine_launcher_class = new
265 self.engine_launcher_class = new
266 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
266 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
267 config=True,
267 config=True,
268 help="""The class for launching a set of Engines. Change this value
268 help="""The class for launching a set of Engines. Change this value
269 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
269 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
270 Each launcher class has its own set of configuration options, for making sure
270 Each launcher class has its own set of configuration options, for making sure
271 it will work in your environment.
271 it will work in your environment.
272
272
273 You can also write your own launcher, and specify it's absolute import path,
273 You can also write your own launcher, and specify it's absolute import path,
274 as in 'mymodule.launcher.FTLEnginesLauncher`.
274 as in 'mymodule.launcher.FTLEnginesLauncher`.
275
275
276 IPython's bundled examples include:
276 IPython's bundled examples include:
277
277
278 Local : start engines locally as subprocesses [default]
278 Local : start engines locally as subprocesses [default]
279 MPI : use mpiexec to launch engines in an MPI environment
279 MPI : use mpiexec to launch engines in an MPI environment
280 PBS : use PBS (qsub) to submit engines to a batch queue
280 PBS : use PBS (qsub) to submit engines to a batch queue
281 SGE : use SGE (qsub) to submit engines to a batch queue
281 SGE : use SGE (qsub) to submit engines to a batch queue
282 LSF : use LSF (bsub) to submit engines to a batch queue
282 LSF : use LSF (bsub) to submit engines to a batch queue
283 SSH : use SSH to start the controller
283 SSH : use SSH to start the controller
284 Note that SSH does *not* move the connection files
284 Note that SSH does *not* move the connection files
285 around, so you will likely have to do this manually
285 around, so you will likely have to do this manually
286 unless the machines are on a shared file system.
286 unless the machines are on a shared file system.
287 HTCondor : use HTCondor to submit engines to a batch queue
287 HTCondor : use HTCondor to submit engines to a batch queue
288 WindowsHPC : use Windows HPC
288 WindowsHPC : use Windows HPC
289
289
290 If you are using one of IPython's builtin launchers, you can specify just the
290 If you are using one of IPython's builtin launchers, you can specify just the
291 prefix, e.g:
291 prefix, e.g:
292
292
293 c.IPClusterEngines.engine_launcher_class = 'SSH'
293 c.IPClusterEngines.engine_launcher_class = 'SSH'
294
294
295 or:
295 or:
296
296
297 ipcluster start --engines=MPI
297 ipcluster start --engines=MPI
298
298
299 """
299 """
300 )
300 )
301 daemonize = Bool(False, config=True,
301 daemonize = Bool(False, config=True,
302 help="""Daemonize the ipcluster program. This implies --log-to-file.
302 help="""Daemonize the ipcluster program. This implies --log-to-file.
303 Not available on Windows.
303 Not available on Windows.
304 """)
304 """)
305
305
306 def _daemonize_changed(self, name, old, new):
306 def _daemonize_changed(self, name, old, new):
307 if new:
307 if new:
308 self.log_to_file = True
308 self.log_to_file = True
309
309
310 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
310 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
311 _stopping = False
311 _stopping = False
312
312
313 aliases = Dict(engine_aliases)
313 aliases = Dict(engine_aliases)
314 flags = Dict(engine_flags)
314 flags = Dict(engine_flags)
315
315
316 @catch_config_error
316 @catch_config_error
317 def initialize(self, argv=None):
317 def initialize(self, argv=None):
318 super(IPClusterEngines, self).initialize(argv)
318 super(IPClusterEngines, self).initialize(argv)
319 self.init_signal()
319 self.init_signal()
320 self.init_launchers()
320 self.init_launchers()
321
321
322 def init_launchers(self):
322 def init_launchers(self):
323 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
323 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
324
324
325 def init_signal(self):
325 def init_signal(self):
326 # Setup signals
326 # Setup signals
327 signal.signal(signal.SIGINT, self.sigint_handler)
327 signal.signal(signal.SIGINT, self.sigint_handler)
328
328
329 def build_launcher(self, clsname, kind=None):
329 def build_launcher(self, clsname, kind=None):
330 """import and instantiate a Launcher based on importstring"""
330 """import and instantiate a Launcher based on importstring"""
331 try:
331 try:
332 klass = find_launcher_class(clsname, kind)
332 klass = find_launcher_class(clsname, kind)
333 except (ImportError, KeyError):
333 except (ImportError, KeyError):
334 self.log.fatal("Could not import launcher class: %r"%clsname)
334 self.log.fatal("Could not import launcher class: %r"%clsname)
335 self.exit(1)
335 self.exit(1)
336
336
337 launcher = klass(
337 launcher = klass(
338 work_dir=u'.', parent=self, log=self.log,
338 work_dir=u'.', parent=self, log=self.log,
339 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
339 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
340 )
340 )
341 return launcher
341 return launcher
342
342
343 def engines_started_ok(self):
343 def engines_started_ok(self):
344 self.log.info("Engines appear to have started successfully")
344 self.log.info("Engines appear to have started successfully")
345 self.early_shutdown = 0
345 self.early_shutdown = 0
346
346
347 def start_engines(self):
347 def start_engines(self):
348 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
348 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
349 n = getattr(self.engine_launcher, 'engine_count', self.n)
349 n = getattr(self.engine_launcher, 'engine_count', self.n)
350 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
350 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
351 self.engine_launcher.start(self.n)
351 self.engine_launcher.start(self.n)
352 self.engine_launcher.on_stop(self.engines_stopped_early)
352 self.engine_launcher.on_stop(self.engines_stopped_early)
353 if self.early_shutdown:
353 if self.early_shutdown:
354 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
354 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
355
355
356 def engines_stopped_early(self, r):
356 def engines_stopped_early(self, r):
357 if self.early_shutdown and not self._stopping:
357 if self.early_shutdown and not self._stopping:
358 self.log.error("""
358 self.log.error("""
359 Engines shutdown early, they probably failed to connect.
359 Engines shutdown early, they probably failed to connect.
360
360
361 Check the engine log files for output.
361 Check the engine log files for output.
362
362
363 If your controller and engines are not on the same machine, you probably
363 If your controller and engines are not on the same machine, you probably
364 have to instruct the controller to listen on an interface other than localhost.
364 have to instruct the controller to listen on an interface other than localhost.
365
365
366 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
366 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
367
367
368 Be sure to read our security docs before instructing your controller to listen on
368 Be sure to read our security docs before instructing your controller to listen on
369 a public interface.
369 a public interface.
370 """)
370 """)
371 self.stop_launchers()
371 self.stop_launchers()
372
372
373 return self.engines_stopped(r)
373 return self.engines_stopped(r)
374
374
375 def engines_stopped(self, r):
375 def engines_stopped(self, r):
376 return self.loop.stop()
376 return self.loop.stop()
377
377
378 def stop_engines(self):
378 def stop_engines(self):
379 if self.engine_launcher.running:
379 if self.engine_launcher.running:
380 self.log.info("Stopping Engines...")
380 self.log.info("Stopping Engines...")
381 d = self.engine_launcher.stop()
381 d = self.engine_launcher.stop()
382 return d
382 return d
383 else:
383 else:
384 return None
384 return None
385
385
386 def stop_launchers(self, r=None):
386 def stop_launchers(self, r=None):
387 if not self._stopping:
387 if not self._stopping:
388 self._stopping = True
388 self._stopping = True
389 self.log.error("IPython cluster: stopping")
389 self.log.error("IPython cluster: stopping")
390 self.stop_engines()
390 self.stop_engines()
391 # Wait a few seconds to let things shut down.
391 # Wait a few seconds to let things shut down.
392 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
392 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
393 dc.start()
393 dc.start()
394
394
395 def sigint_handler(self, signum, frame):
395 def sigint_handler(self, signum, frame):
396 self.log.debug("SIGINT received, stopping launchers...")
396 self.log.debug("SIGINT received, stopping launchers...")
397 self.stop_launchers()
397 self.stop_launchers()
398
398
399 def start_logging(self):
399 def start_logging(self):
400 # Remove old log files of the controller and engine
400 # Remove old log files of the controller and engine
401 if self.clean_logs:
401 if self.clean_logs:
402 log_dir = self.profile_dir.log_dir
402 log_dir = self.profile_dir.log_dir
403 for f in os.listdir(log_dir):
403 for f in os.listdir(log_dir):
404 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
404 if re.match(r'ip(engine|controller)-.+\.(log|err|out)',f):
405 os.remove(os.path.join(log_dir, f))
405 os.remove(os.path.join(log_dir, f))
406 # This will remove old log files for ipcluster itself
407 # super(IPBaseParallelApplication, self).start_logging()
408
406
409 def start(self):
407 def start(self):
410 """Start the app for the engines subcommand."""
408 """Start the app for the engines subcommand."""
411 self.log.info("IPython cluster: started")
409 self.log.info("IPython cluster: started")
412 # First see if the cluster is already running
410 # First see if the cluster is already running
413
411
414 # Now log and daemonize
412 # Now log and daemonize
415 self.log.info(
413 self.log.info(
416 'Starting engines with [daemon=%r]' % self.daemonize
414 'Starting engines with [daemon=%r]' % self.daemonize
417 )
415 )
418 # TODO: Get daemonize working on Windows or as a Windows Server.
416 # TODO: Get daemonize working on Windows or as a Windows Server.
419 if self.daemonize:
417 if self.daemonize:
420 if os.name=='posix':
418 if os.name=='posix':
421 daemonize()
419 daemonize()
422
420
423 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
421 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
424 dc.start()
422 dc.start()
425 # Now write the new pid file AFTER our new forked pid is active.
423 # Now write the new pid file AFTER our new forked pid is active.
426 # self.write_pid_file()
424 # self.write_pid_file()
427 try:
425 try:
428 self.loop.start()
426 self.loop.start()
429 except KeyboardInterrupt:
427 except KeyboardInterrupt:
430 pass
428 pass
431 except zmq.ZMQError as e:
429 except zmq.ZMQError as e:
432 if e.errno == errno.EINTR:
430 if e.errno == errno.EINTR:
433 pass
431 pass
434 else:
432 else:
435 raise
433 raise
436
434
437 start_aliases = {}
435 start_aliases = {}
438 start_aliases.update(engine_aliases)
436 start_aliases.update(engine_aliases)
439 start_aliases.update(dict(
437 start_aliases.update(dict(
440 delay='IPClusterStart.delay',
438 delay='IPClusterStart.delay',
441 controller = 'IPClusterStart.controller_launcher_class',
439 controller = 'IPClusterStart.controller_launcher_class',
442 ))
440 ))
443 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
441 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
444
442
445 class IPClusterStart(IPClusterEngines):
443 class IPClusterStart(IPClusterEngines):
446
444
447 name = u'ipcluster'
445 name = u'ipcluster'
448 description = start_help
446 description = start_help
449 examples = _start_examples
447 examples = _start_examples
450 default_log_level = logging.INFO
448 default_log_level = logging.INFO
451 auto_create = Bool(True, config=True,
449 auto_create = Bool(True, config=True,
452 help="whether to create the profile_dir if it doesn't exist")
450 help="whether to create the profile_dir if it doesn't exist")
453 classes = List()
451 classes = List()
454 def _classes_default(self,):
452 def _classes_default(self,):
455 from IPython.parallel.apps import launcher
453 from IPython.parallel.apps import launcher
456 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
454 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
457
455
458 clean_logs = Bool(True, config=True,
456 clean_logs = Bool(True, config=True,
459 help="whether to cleanup old logs before starting")
457 help="whether to cleanup old logs before starting")
460
458
461 delay = CFloat(1., config=True,
459 delay = CFloat(1., config=True,
462 help="delay (in s) between starting the controller and the engines")
460 help="delay (in s) between starting the controller and the engines")
463
461
464 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
462 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
465 def _controller_launcher_changed(self, name, old, new):
463 def _controller_launcher_changed(self, name, old, new):
466 if isinstance(new, string_types):
464 if isinstance(new, string_types):
467 # old 0.11-style config
465 # old 0.11-style config
468 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
466 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
469 " use controller_launcher_class" % self.__class__.__name__)
467 " use controller_launcher_class" % self.__class__.__name__)
470 self.controller_launcher_class = new
468 self.controller_launcher_class = new
471 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
469 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
472 config=True,
470 config=True,
473 help="""The class for launching a Controller. Change this value if you want
471 help="""The class for launching a Controller. Change this value if you want
474 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
472 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
475
473
476 Each launcher class has its own set of configuration options, for making sure
474 Each launcher class has its own set of configuration options, for making sure
477 it will work in your environment.
475 it will work in your environment.
478
476
479 Note that using a batch launcher for the controller *does not* put it
477 Note that using a batch launcher for the controller *does not* put it
480 in the same batch job as the engines, so they will still start separately.
478 in the same batch job as the engines, so they will still start separately.
481
479
482 IPython's bundled examples include:
480 IPython's bundled examples include:
483
481
484 Local : start engines locally as subprocesses
482 Local : start engines locally as subprocesses
485 MPI : use mpiexec to launch the controller in an MPI universe
483 MPI : use mpiexec to launch the controller in an MPI universe
486 PBS : use PBS (qsub) to submit the controller to a batch queue
484 PBS : use PBS (qsub) to submit the controller to a batch queue
487 SGE : use SGE (qsub) to submit the controller to a batch queue
485 SGE : use SGE (qsub) to submit the controller to a batch queue
488 LSF : use LSF (bsub) to submit the controller to a batch queue
486 LSF : use LSF (bsub) to submit the controller to a batch queue
489 HTCondor : use HTCondor to submit the controller to a batch queue
487 HTCondor : use HTCondor to submit the controller to a batch queue
490 SSH : use SSH to start the controller
488 SSH : use SSH to start the controller
491 WindowsHPC : use Windows HPC
489 WindowsHPC : use Windows HPC
492
490
493 If you are using one of IPython's builtin launchers, you can specify just the
491 If you are using one of IPython's builtin launchers, you can specify just the
494 prefix, e.g:
492 prefix, e.g:
495
493
496 c.IPClusterStart.controller_launcher_class = 'SSH'
494 c.IPClusterStart.controller_launcher_class = 'SSH'
497
495
498 or:
496 or:
499
497
500 ipcluster start --controller=MPI
498 ipcluster start --controller=MPI
501
499
502 """
500 """
503 )
501 )
504 reset = Bool(False, config=True,
502 reset = Bool(False, config=True,
505 help="Whether to reset config files as part of '--create'."
503 help="Whether to reset config files as part of '--create'."
506 )
504 )
507
505
508 # flags = Dict(flags)
506 # flags = Dict(flags)
509 aliases = Dict(start_aliases)
507 aliases = Dict(start_aliases)
510
508
511 def init_launchers(self):
509 def init_launchers(self):
512 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
510 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
513 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
511 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
514
512
515 def engines_stopped(self, r):
513 def engines_stopped(self, r):
516 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
514 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
517 pass
515 pass
518
516
519 def start_controller(self):
517 def start_controller(self):
520 self.log.info("Starting Controller with %s", self.controller_launcher_class)
518 self.log.info("Starting Controller with %s", self.controller_launcher_class)
521 self.controller_launcher.on_stop(self.stop_launchers)
519 self.controller_launcher.on_stop(self.stop_launchers)
522 self.controller_launcher.start()
520 self.controller_launcher.start()
523
521
524 def stop_controller(self):
522 def stop_controller(self):
525 # self.log.info("In stop_controller")
523 # self.log.info("In stop_controller")
526 if self.controller_launcher and self.controller_launcher.running:
524 if self.controller_launcher and self.controller_launcher.running:
527 return self.controller_launcher.stop()
525 return self.controller_launcher.stop()
528
526
529 def stop_launchers(self, r=None):
527 def stop_launchers(self, r=None):
530 if not self._stopping:
528 if not self._stopping:
531 self.stop_controller()
529 self.stop_controller()
532 super(IPClusterStart, self).stop_launchers()
530 super(IPClusterStart, self).stop_launchers()
533
531
534 def start(self):
532 def start(self):
535 """Start the app for the start subcommand."""
533 """Start the app for the start subcommand."""
536 # First see if the cluster is already running
534 # First see if the cluster is already running
537 try:
535 try:
538 pid = self.get_pid_from_file()
536 pid = self.get_pid_from_file()
539 except PIDFileError:
537 except PIDFileError:
540 pass
538 pass
541 else:
539 else:
542 if self.check_pid(pid):
540 if self.check_pid(pid):
543 self.log.critical(
541 self.log.critical(
544 'Cluster is already running with [pid=%s]. '
542 'Cluster is already running with [pid=%s]. '
545 'use "ipcluster stop" to stop the cluster.' % pid
543 'use "ipcluster stop" to stop the cluster.' % pid
546 )
544 )
547 # Here I exit with a unusual exit status that other processes
545 # Here I exit with a unusual exit status that other processes
548 # can watch for to learn how I existed.
546 # can watch for to learn how I existed.
549 self.exit(ALREADY_STARTED)
547 self.exit(ALREADY_STARTED)
550 else:
548 else:
551 self.remove_pid_file()
549 self.remove_pid_file()
552
550
553
551
554 # Now log and daemonize
552 # Now log and daemonize
555 self.log.info(
553 self.log.info(
556 'Starting ipcluster with [daemon=%r]' % self.daemonize
554 'Starting ipcluster with [daemon=%r]' % self.daemonize
557 )
555 )
558 # TODO: Get daemonize working on Windows or as a Windows Server.
556 # TODO: Get daemonize working on Windows or as a Windows Server.
559 if self.daemonize:
557 if self.daemonize:
560 if os.name=='posix':
558 if os.name=='posix':
561 daemonize()
559 daemonize()
562
560
563 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
561 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
564 dc.start()
562 dc.start()
565 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
563 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
566 dc.start()
564 dc.start()
567 # Now write the new pid file AFTER our new forked pid is active.
565 # Now write the new pid file AFTER our new forked pid is active.
568 self.write_pid_file()
566 self.write_pid_file()
569 try:
567 try:
570 self.loop.start()
568 self.loop.start()
571 except KeyboardInterrupt:
569 except KeyboardInterrupt:
572 pass
570 pass
573 except zmq.ZMQError as e:
571 except zmq.ZMQError as e:
574 if e.errno == errno.EINTR:
572 if e.errno == errno.EINTR:
575 pass
573 pass
576 else:
574 else:
577 raise
575 raise
578 finally:
576 finally:
579 self.remove_pid_file()
577 self.remove_pid_file()
580
578
581 base='IPython.parallel.apps.ipclusterapp.IPCluster'
579 base='IPython.parallel.apps.ipclusterapp.IPCluster'
582
580
583 class IPClusterApp(BaseIPythonApplication):
581 class IPClusterApp(BaseIPythonApplication):
584 name = u'ipcluster'
582 name = u'ipcluster'
585 description = _description
583 description = _description
586 examples = _main_examples
584 examples = _main_examples
587
585
588 subcommands = {
586 subcommands = {
589 'start' : (base+'Start', start_help),
587 'start' : (base+'Start', start_help),
590 'stop' : (base+'Stop', stop_help),
588 'stop' : (base+'Stop', stop_help),
591 'engines' : (base+'Engines', engines_help),
589 'engines' : (base+'Engines', engines_help),
592 }
590 }
593
591
594 # no aliases or flags for parent App
592 # no aliases or flags for parent App
595 aliases = Dict()
593 aliases = Dict()
596 flags = Dict()
594 flags = Dict()
597
595
598 def start(self):
596 def start(self):
599 if self.subapp is None:
597 if self.subapp is None:
600 print("No subcommand specified. Must specify one of: %s"%(self.subcommands.keys()))
598 print("No subcommand specified. Must specify one of: %s"%(self.subcommands.keys()))
601 print()
599 print()
602 self.print_description()
600 self.print_description()
603 self.print_subcommands()
601 self.print_subcommands()
604 self.exit(1)
602 self.exit(1)
605 else:
603 else:
606 return self.subapp.start()
604 return self.subapp.start()
607
605
608 launch_new_instance = IPClusterApp.launch_instance
606 launch_new_instance = IPClusterApp.launch_instance
609
607
610 if __name__ == '__main__':
608 if __name__ == '__main__':
611 launch_new_instance()
609 launch_new_instance()
612
610
General Comments 0
You need to be logged in to leave comments. Login now