##// END OF EJS Templates
add Condor to docs in ipclusterapp
James Booth -
Show More
@@ -1,619 +1,620 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag, catch_config_error
34 from IPython.config.application import Application, boolean_flag, catch_config_error
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.profiledir import ProfileDir
37 from IPython.core.profiledir import ProfileDir
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.sysinfo import num_cpus
40 from IPython.utils.sysinfo import num_cpus
41 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
41 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
42 DottedObjectName)
42 DottedObjectName)
43
43
44 from IPython.parallel.apps.baseapp import (
44 from IPython.parallel.apps.baseapp import (
45 BaseParallelApplication,
45 BaseParallelApplication,
46 PIDFileError,
46 PIDFileError,
47 base_flags, base_aliases
47 base_flags, base_aliases
48 )
48 )
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Module level variables
52 # Module level variables
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55
55
56 default_config_file_name = u'ipcluster_config.py'
56 default_config_file_name = u'ipcluster_config.py'
57
57
58
58
59 _description = """Start an IPython cluster for parallel computing.
59 _description = """Start an IPython cluster for parallel computing.
60
60
61 An IPython cluster consists of 1 controller and 1 or more engines.
61 An IPython cluster consists of 1 controller and 1 or more engines.
62 This command automates the startup of these processes using a wide
62 This command automates the startup of these processes using a wide range of
63 range of startup methods (SSH, local processes, PBS, mpiexec,
63 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, Condor,
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 local host simply do 'ipcluster start --n=4'. For more complex usage
65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 you will typically do 'ipython profile create mycluster --parallel', then edit
66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 """
68 """
69
69
70 _main_examples = """
70 _main_examples = """
71 ipcluster start --n=4 # start a 4 node cluster on localhost
71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 ipcluster start -h # show the help string for the start subcmd
72 ipcluster start -h # show the help string for the start subcmd
73
73
74 ipcluster stop -h # show the help string for the stop subcmd
74 ipcluster stop -h # show the help string for the stop subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
76 """
76 """
77
77
78 _start_examples = """
78 _start_examples = """
79 ipython profile create mycluster --parallel # create mycluster profile
79 ipython profile create mycluster --parallel # create mycluster profile
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 """
81 """
82
82
83 _stop_examples = """
83 _stop_examples = """
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 """
85 """
86
86
87 _engines_examples = """
87 _engines_examples = """
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 """
89 """
90
90
91
91
92 # Exit codes for ipcluster
92 # Exit codes for ipcluster
93
93
94 # This will be the exit code if the ipcluster appears to be running because
94 # This will be the exit code if the ipcluster appears to be running because
95 # a .pid file exists
95 # a .pid file exists
96 ALREADY_STARTED = 10
96 ALREADY_STARTED = 10
97
97
98
98
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 # file to be found.
100 # file to be found.
101 ALREADY_STOPPED = 11
101 ALREADY_STOPPED = 11
102
102
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 # file to be found.
104 # file to be found.
105 NO_CLUSTER = 12
105 NO_CLUSTER = 12
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Utilities
109 # Utilities
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111
111
112 def find_launcher_class(clsname, kind):
112 def find_launcher_class(clsname, kind):
113 """Return a launcher for a given clsname and kind.
113 """Return a launcher for a given clsname and kind.
114
114
115 Parameters
115 Parameters
116 ==========
116 ==========
117 clsname : str
117 clsname : str
118 The full name of the launcher class, either with or without the
118 The full name of the launcher class, either with or without the
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF,
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, Condor
120 WindowsHPC).
120 WindowsHPC).
121 kind : str
121 kind : str
122 Either 'EngineSet' or 'Controller'.
122 Either 'EngineSet' or 'Controller'.
123 """
123 """
124 if '.' not in clsname:
124 if '.' not in clsname:
125 # not a module, presume it's the raw name in apps.launcher
125 # not a module, presume it's the raw name in apps.launcher
126 if kind and kind not in clsname:
126 if kind and kind not in clsname:
127 # doesn't match necessary full class name, assume it's
127 # doesn't match necessary full class name, assume it's
128 # just 'PBS' or 'MPI' prefix:
128 # just 'PBS' or 'MPI' etc prefix:
129 clsname = clsname + kind + 'Launcher'
129 clsname = clsname + kind + 'Launcher'
130 clsname = 'IPython.parallel.apps.launcher.'+clsname
130 clsname = 'IPython.parallel.apps.launcher.'+clsname
131 klass = import_item(clsname)
131 klass = import_item(clsname)
132 return klass
132 return klass
133
133
134 #-----------------------------------------------------------------------------
134 #-----------------------------------------------------------------------------
135 # Main application
135 # Main application
136 #-----------------------------------------------------------------------------
136 #-----------------------------------------------------------------------------
137
137
138 start_help = """Start an IPython cluster for parallel computing
138 start_help = """Start an IPython cluster for parallel computing
139
139
140 Start an ipython cluster by its profile name or cluster
140 Start an ipython cluster by its profile name or cluster
141 directory. Cluster directories contain configuration, log and
141 directory. Cluster directories contain configuration, log and
142 security related files and are named using the convention
142 security related files and are named using the convention
143 'profile_<name>' and should be creating using the 'start'
143 'profile_<name>' and should be creating using the 'start'
144 subcommand of 'ipcluster'. If your cluster directory is in
144 subcommand of 'ipcluster'. If your cluster directory is in
145 the cwd or the ipython directory, you can simply refer to it
145 the cwd or the ipython directory, you can simply refer to it
146 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
146 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
147 otherwise use the 'profile-dir' option.
147 otherwise use the 'profile-dir' option.
148 """
148 """
149 stop_help = """Stop a running IPython cluster
149 stop_help = """Stop a running IPython cluster
150
150
151 Stop a running ipython cluster by its profile name or cluster
151 Stop a running ipython cluster by its profile name or cluster
152 directory. Cluster directories are named using the convention
152 directory. Cluster directories are named using the convention
153 'profile_<name>'. If your cluster directory is in
153 'profile_<name>'. If your cluster directory is in
154 the cwd or the ipython directory, you can simply refer to it
154 the cwd or the ipython directory, you can simply refer to it
155 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
155 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
156 use the '--profile-dir' option.
156 use the '--profile-dir' option.
157 """
157 """
158 engines_help = """Start engines connected to an existing IPython cluster
158 engines_help = """Start engines connected to an existing IPython cluster
159
159
160 Start one or more engines to connect to an existing Cluster
160 Start one or more engines to connect to an existing Cluster
161 by profile name or cluster directory.
161 by profile name or cluster directory.
162 Cluster directories contain configuration, log and
162 Cluster directories contain configuration, log and
163 security related files and are named using the convention
163 security related files and are named using the convention
164 'profile_<name>' and should be creating using the 'start'
164 'profile_<name>' and should be creating using the 'start'
165 subcommand of 'ipcluster'. If your cluster directory is in
165 subcommand of 'ipcluster'. If your cluster directory is in
166 the cwd or the ipython directory, you can simply refer to it
166 the cwd or the ipython directory, you can simply refer to it
167 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
167 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
168 otherwise use the 'profile-dir' option.
168 otherwise use the 'profile-dir' option.
169 """
169 """
170 stop_aliases = dict(
170 stop_aliases = dict(
171 signal='IPClusterStop.signal',
171 signal='IPClusterStop.signal',
172 )
172 )
173 stop_aliases.update(base_aliases)
173 stop_aliases.update(base_aliases)
174
174
175 class IPClusterStop(BaseParallelApplication):
175 class IPClusterStop(BaseParallelApplication):
176 name = u'ipcluster'
176 name = u'ipcluster'
177 description = stop_help
177 description = stop_help
178 examples = _stop_examples
178 examples = _stop_examples
179 config_file_name = Unicode(default_config_file_name)
179 config_file_name = Unicode(default_config_file_name)
180
180
181 signal = Integer(signal.SIGINT, config=True,
181 signal = Integer(signal.SIGINT, config=True,
182 help="signal to use for stopping processes.")
182 help="signal to use for stopping processes.")
183
183
184 aliases = Dict(stop_aliases)
184 aliases = Dict(stop_aliases)
185
185
186 def start(self):
186 def start(self):
187 """Start the app for the stop subcommand."""
187 """Start the app for the stop subcommand."""
188 try:
188 try:
189 pid = self.get_pid_from_file()
189 pid = self.get_pid_from_file()
190 except PIDFileError:
190 except PIDFileError:
191 self.log.critical(
191 self.log.critical(
192 'Could not read pid file, cluster is probably not running.'
192 'Could not read pid file, cluster is probably not running.'
193 )
193 )
194 # Here I exit with a unusual exit status that other processes
194 # Here I exit with a unusual exit status that other processes
195 # can watch for to learn how I existed.
195 # can watch for to learn how I existed.
196 self.remove_pid_file()
196 self.remove_pid_file()
197 self.exit(ALREADY_STOPPED)
197 self.exit(ALREADY_STOPPED)
198
198
199 if not self.check_pid(pid):
199 if not self.check_pid(pid):
200 self.log.critical(
200 self.log.critical(
201 'Cluster [pid=%r] is not running.' % pid
201 'Cluster [pid=%r] is not running.' % pid
202 )
202 )
203 self.remove_pid_file()
203 self.remove_pid_file()
204 # Here I exit with a unusual exit status that other processes
204 # Here I exit with a unusual exit status that other processes
205 # can watch for to learn how I existed.
205 # can watch for to learn how I existed.
206 self.exit(ALREADY_STOPPED)
206 self.exit(ALREADY_STOPPED)
207
207
208 elif os.name=='posix':
208 elif os.name=='posix':
209 sig = self.signal
209 sig = self.signal
210 self.log.info(
210 self.log.info(
211 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
211 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
212 )
212 )
213 try:
213 try:
214 os.kill(pid, sig)
214 os.kill(pid, sig)
215 except OSError:
215 except OSError:
216 self.log.error("Stopping cluster failed, assuming already dead.",
216 self.log.error("Stopping cluster failed, assuming already dead.",
217 exc_info=True)
217 exc_info=True)
218 self.remove_pid_file()
218 self.remove_pid_file()
219 elif os.name=='nt':
219 elif os.name=='nt':
220 try:
220 try:
221 # kill the whole tree
221 # kill the whole tree
222 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
222 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
223 except (CalledProcessError, OSError):
223 except (CalledProcessError, OSError):
224 self.log.error("Stopping cluster failed, assuming already dead.",
224 self.log.error("Stopping cluster failed, assuming already dead.",
225 exc_info=True)
225 exc_info=True)
226 self.remove_pid_file()
226 self.remove_pid_file()
227
227
228 engine_aliases = {}
228 engine_aliases = {}
229 engine_aliases.update(base_aliases)
229 engine_aliases.update(base_aliases)
230 engine_aliases.update(dict(
230 engine_aliases.update(dict(
231 n='IPClusterEngines.n',
231 n='IPClusterEngines.n',
232 engines = 'IPClusterEngines.engine_launcher_class',
232 engines = 'IPClusterEngines.engine_launcher_class',
233 daemonize = 'IPClusterEngines.daemonize',
233 daemonize = 'IPClusterEngines.daemonize',
234 ))
234 ))
235 engine_flags = {}
235 engine_flags = {}
236 engine_flags.update(base_flags)
236 engine_flags.update(base_flags)
237
237
238 engine_flags.update(dict(
238 engine_flags.update(dict(
239 daemonize=(
239 daemonize=(
240 {'IPClusterEngines' : {'daemonize' : True}},
240 {'IPClusterEngines' : {'daemonize' : True}},
241 """run the cluster into the background (not available on Windows)""",
241 """run the cluster into the background (not available on Windows)""",
242 )
242 )
243 ))
243 ))
244 class IPClusterEngines(BaseParallelApplication):
244 class IPClusterEngines(BaseParallelApplication):
245
245
246 name = u'ipcluster'
246 name = u'ipcluster'
247 description = engines_help
247 description = engines_help
248 examples = _engines_examples
248 examples = _engines_examples
249 usage = None
249 usage = None
250 config_file_name = Unicode(default_config_file_name)
250 config_file_name = Unicode(default_config_file_name)
251 default_log_level = logging.INFO
251 default_log_level = logging.INFO
252 classes = List()
252 classes = List()
253 def _classes_default(self):
253 def _classes_default(self):
254 from IPython.parallel.apps import launcher
254 from IPython.parallel.apps import launcher
255 launchers = launcher.all_launchers
255 launchers = launcher.all_launchers
256 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
256 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
257 return [ProfileDir]+eslaunchers
257 return [ProfileDir]+eslaunchers
258
258
259 n = Integer(num_cpus(), config=True,
259 n = Integer(num_cpus(), config=True,
260 help="""The number of engines to start. The default is to use one for each
260 help="""The number of engines to start. The default is to use one for each
261 CPU on your machine""")
261 CPU on your machine""")
262
262
263 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
263 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
264 def _engine_launcher_changed(self, name, old, new):
264 def _engine_launcher_changed(self, name, old, new):
265 if isinstance(new, basestring):
265 if isinstance(new, basestring):
266 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
266 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
267 " use engine_launcher_class" % self.__class__.__name__)
267 " use engine_launcher_class" % self.__class__.__name__)
268 self.engine_launcher_class = new
268 self.engine_launcher_class = new
269 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
269 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
270 config=True,
270 config=True,
271 help="""The class for launching a set of Engines. Change this value
271 help="""The class for launching a set of Engines. Change this value
272 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
272 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
273 Each launcher class has its own set of configuration options, for making sure
273 Each launcher class has its own set of configuration options, for making sure
274 it will work in your environment.
274 it will work in your environment.
275
275
276 You can also write your own launcher, and specify it's absolute import path,
276 You can also write your own launcher, and specify it's absolute import path,
277 as in 'mymodule.launcher.FTLEnginesLauncher`.
277 as in 'mymodule.launcher.FTLEnginesLauncher`.
278
278
279 IPython's bundled examples include:
279 IPython's bundled examples include:
280
280
281 Local : start engines locally as subprocesses [default]
281 Local : start engines locally as subprocesses [default]
282 MPI : use mpiexec to launch engines in an MPI environment
282 MPI : use mpiexec to launch engines in an MPI environment
283 PBS : use PBS (qsub) to submit engines to a batch queue
283 PBS : use PBS (qsub) to submit engines to a batch queue
284 SGE : use SGE (qsub) to submit engines to a batch queue
284 SGE : use SGE (qsub) to submit engines to a batch queue
285 LSF : use LSF (bsub) to submit engines to a batch queue
285 LSF : use LSF (bsub) to submit engines to a batch queue
286 SSH : use SSH to start the controller
286 SSH : use SSH to start the controller
287 Note that SSH does *not* move the connection files
287 Note that SSH does *not* move the connection files
288 around, so you will likely have to do this manually
288 around, so you will likely have to do this manually
289 unless the machines are on a shared file system.
289 unless the machines are on a shared file system.
290 Condor : use HTCondor to submit engines to a batch queue
290 WindowsHPC : use Windows HPC
291 WindowsHPC : use Windows HPC
291
292
292 If you are using one of IPython's builtin launchers, you can specify just the
293 If you are using one of IPython's builtin launchers, you can specify just the
293 prefix, e.g:
294 prefix, e.g:
294
295
295 c.IPClusterEngines.engine_launcher_class = 'SSH'
296 c.IPClusterEngines.engine_launcher_class = 'SSH'
296
297
297 or:
298 or:
298
299
299 ipcluster start --engines=MPI
300 ipcluster start --engines=MPI
300
301
301 """
302 """
302 )
303 )
303 daemonize = Bool(False, config=True,
304 daemonize = Bool(False, config=True,
304 help="""Daemonize the ipcluster program. This implies --log-to-file.
305 help="""Daemonize the ipcluster program. This implies --log-to-file.
305 Not available on Windows.
306 Not available on Windows.
306 """)
307 """)
307
308
308 def _daemonize_changed(self, name, old, new):
309 def _daemonize_changed(self, name, old, new):
309 if new:
310 if new:
310 self.log_to_file = True
311 self.log_to_file = True
311
312
312 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
313 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
313 _stopping = False
314 _stopping = False
314
315
315 aliases = Dict(engine_aliases)
316 aliases = Dict(engine_aliases)
316 flags = Dict(engine_flags)
317 flags = Dict(engine_flags)
317
318
318 @catch_config_error
319 @catch_config_error
319 def initialize(self, argv=None):
320 def initialize(self, argv=None):
320 super(IPClusterEngines, self).initialize(argv)
321 super(IPClusterEngines, self).initialize(argv)
321 self.init_signal()
322 self.init_signal()
322 self.init_launchers()
323 self.init_launchers()
323
324
324 def init_launchers(self):
325 def init_launchers(self):
325 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
326 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
326
327
327 def init_signal(self):
328 def init_signal(self):
328 # Setup signals
329 # Setup signals
329 signal.signal(signal.SIGINT, self.sigint_handler)
330 signal.signal(signal.SIGINT, self.sigint_handler)
330
331
331 def build_launcher(self, clsname, kind=None):
332 def build_launcher(self, clsname, kind=None):
332 """import and instantiate a Launcher based on importstring"""
333 """import and instantiate a Launcher based on importstring"""
333 try:
334 try:
334 klass = find_launcher_class(clsname, kind)
335 klass = find_launcher_class(clsname, kind)
335 except (ImportError, KeyError):
336 except (ImportError, KeyError):
336 self.log.fatal("Could not import launcher class: %r"%clsname)
337 self.log.fatal("Could not import launcher class: %r"%clsname)
337 self.exit(1)
338 self.exit(1)
338
339
339 launcher = klass(
340 launcher = klass(
340 work_dir=u'.', config=self.config, log=self.log,
341 work_dir=u'.', config=self.config, log=self.log,
341 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
342 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
342 )
343 )
343 return launcher
344 return launcher
344
345
345 def engines_started_ok(self):
346 def engines_started_ok(self):
346 self.log.info("Engines appear to have started successfully")
347 self.log.info("Engines appear to have started successfully")
347 self.early_shutdown = 0
348 self.early_shutdown = 0
348
349
349 def start_engines(self):
350 def start_engines(self):
350 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
351 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
351 n = getattr(self.engine_launcher, 'engine_count', self.n)
352 n = getattr(self.engine_launcher, 'engine_count', self.n)
352 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
353 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
353 self.engine_launcher.start(self.n)
354 self.engine_launcher.start(self.n)
354 self.engine_launcher.on_stop(self.engines_stopped_early)
355 self.engine_launcher.on_stop(self.engines_stopped_early)
355 if self.early_shutdown:
356 if self.early_shutdown:
356 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
357 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
357
358
358 def engines_stopped_early(self, r):
359 def engines_stopped_early(self, r):
359 if self.early_shutdown and not self._stopping:
360 if self.early_shutdown and not self._stopping:
360 self.log.error("""
361 self.log.error("""
361 Engines shutdown early, they probably failed to connect.
362 Engines shutdown early, they probably failed to connect.
362
363
363 Check the engine log files for output.
364 Check the engine log files for output.
364
365
365 If your controller and engines are not on the same machine, you probably
366 If your controller and engines are not on the same machine, you probably
366 have to instruct the controller to listen on an interface other than localhost.
367 have to instruct the controller to listen on an interface other than localhost.
367
368
368 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
369 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
369
370
370 Be sure to read our security docs before instructing your controller to listen on
371 Be sure to read our security docs before instructing your controller to listen on
371 a public interface.
372 a public interface.
372 """)
373 """)
373 self.stop_launchers()
374 self.stop_launchers()
374
375
375 return self.engines_stopped(r)
376 return self.engines_stopped(r)
376
377
377 def engines_stopped(self, r):
378 def engines_stopped(self, r):
378 return self.loop.stop()
379 return self.loop.stop()
379
380
380 def stop_engines(self):
381 def stop_engines(self):
381 if self.engine_launcher.running:
382 if self.engine_launcher.running:
382 self.log.info("Stopping Engines...")
383 self.log.info("Stopping Engines...")
383 d = self.engine_launcher.stop()
384 d = self.engine_launcher.stop()
384 return d
385 return d
385 else:
386 else:
386 return None
387 return None
387
388
388 def stop_launchers(self, r=None):
389 def stop_launchers(self, r=None):
389 if not self._stopping:
390 if not self._stopping:
390 self._stopping = True
391 self._stopping = True
391 self.log.error("IPython cluster: stopping")
392 self.log.error("IPython cluster: stopping")
392 self.stop_engines()
393 self.stop_engines()
393 # Wait a few seconds to let things shut down.
394 # Wait a few seconds to let things shut down.
394 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
395 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
395 dc.start()
396 dc.start()
396
397
397 def sigint_handler(self, signum, frame):
398 def sigint_handler(self, signum, frame):
398 self.log.debug("SIGINT received, stopping launchers...")
399 self.log.debug("SIGINT received, stopping launchers...")
399 self.stop_launchers()
400 self.stop_launchers()
400
401
401 def start_logging(self):
402 def start_logging(self):
402 # Remove old log files of the controller and engine
403 # Remove old log files of the controller and engine
403 if self.clean_logs:
404 if self.clean_logs:
404 log_dir = self.profile_dir.log_dir
405 log_dir = self.profile_dir.log_dir
405 for f in os.listdir(log_dir):
406 for f in os.listdir(log_dir):
406 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
407 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
407 os.remove(os.path.join(log_dir, f))
408 os.remove(os.path.join(log_dir, f))
408 # This will remove old log files for ipcluster itself
409 # This will remove old log files for ipcluster itself
409 # super(IPBaseParallelApplication, self).start_logging()
410 # super(IPBaseParallelApplication, self).start_logging()
410
411
411 def start(self):
412 def start(self):
412 """Start the app for the engines subcommand."""
413 """Start the app for the engines subcommand."""
413 self.log.info("IPython cluster: started")
414 self.log.info("IPython cluster: started")
414 # First see if the cluster is already running
415 # First see if the cluster is already running
415
416
416 # Now log and daemonize
417 # Now log and daemonize
417 self.log.info(
418 self.log.info(
418 'Starting engines with [daemon=%r]' % self.daemonize
419 'Starting engines with [daemon=%r]' % self.daemonize
419 )
420 )
420 # TODO: Get daemonize working on Windows or as a Windows Server.
421 # TODO: Get daemonize working on Windows or as a Windows Server.
421 if self.daemonize:
422 if self.daemonize:
422 if os.name=='posix':
423 if os.name=='posix':
423 daemonize()
424 daemonize()
424
425
425 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
426 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
426 dc.start()
427 dc.start()
427 # Now write the new pid file AFTER our new forked pid is active.
428 # Now write the new pid file AFTER our new forked pid is active.
428 # self.write_pid_file()
429 # self.write_pid_file()
429 try:
430 try:
430 self.loop.start()
431 self.loop.start()
431 except KeyboardInterrupt:
432 except KeyboardInterrupt:
432 pass
433 pass
433 except zmq.ZMQError as e:
434 except zmq.ZMQError as e:
434 if e.errno == errno.EINTR:
435 if e.errno == errno.EINTR:
435 pass
436 pass
436 else:
437 else:
437 raise
438 raise
438
439
439 start_aliases = {}
440 start_aliases = {}
440 start_aliases.update(engine_aliases)
441 start_aliases.update(engine_aliases)
441 start_aliases.update(dict(
442 start_aliases.update(dict(
442 delay='IPClusterStart.delay',
443 delay='IPClusterStart.delay',
443 controller = 'IPClusterStart.controller_launcher_class',
444 controller = 'IPClusterStart.controller_launcher_class',
444 ))
445 ))
445 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
446 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
446
447
447 class IPClusterStart(IPClusterEngines):
448 class IPClusterStart(IPClusterEngines):
448
449
449 name = u'ipcluster'
450 name = u'ipcluster'
450 description = start_help
451 description = start_help
451 examples = _start_examples
452 examples = _start_examples
452 default_log_level = logging.INFO
453 default_log_level = logging.INFO
453 auto_create = Bool(True, config=True,
454 auto_create = Bool(True, config=True,
454 help="whether to create the profile_dir if it doesn't exist")
455 help="whether to create the profile_dir if it doesn't exist")
455 classes = List()
456 classes = List()
456 def _classes_default(self,):
457 def _classes_default(self,):
457 from IPython.parallel.apps import launcher
458 from IPython.parallel.apps import launcher
458 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
459 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
459
460
460 clean_logs = Bool(True, config=True,
461 clean_logs = Bool(True, config=True,
461 help="whether to cleanup old logs before starting")
462 help="whether to cleanup old logs before starting")
462
463
463 delay = CFloat(1., config=True,
464 delay = CFloat(1., config=True,
464 help="delay (in s) between starting the controller and the engines")
465 help="delay (in s) between starting the controller and the engines")
465
466
466 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
467 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
467 def _controller_launcher_changed(self, name, old, new):
468 def _controller_launcher_changed(self, name, old, new):
468 if isinstance(new, basestring):
469 if isinstance(new, basestring):
469 # old 0.11-style config
470 # old 0.11-style config
470 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
471 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
471 " use controller_launcher_class" % self.__class__.__name__)
472 " use controller_launcher_class" % self.__class__.__name__)
472 self.controller_launcher_class = new
473 self.controller_launcher_class = new
473 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
474 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
474 config=True,
475 config=True,
475 help="""The class for launching a Controller. Change this value if you want
476 help="""The class for launching a Controller. Change this value if you want
476 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
477 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
477
478
478 Each launcher class has its own set of configuration options, for making sure
479 Each launcher class has its own set of configuration options, for making sure
479 it will work in your environment.
480 it will work in your environment.
480
481
481 Note that using a batch launcher for the controller *does not* put it
482 Note that using a batch launcher for the controller *does not* put it
482 in the same batch job as the engines, so they will still start separately.
483 in the same batch job as the engines, so they will still start separately.
483
484
484 IPython's bundled examples include:
485 IPython's bundled examples include:
485
486
486 Local : start engines locally as subprocesses
487 Local : start engines locally as subprocesses
487 MPI : use mpiexec to launch the controller in an MPI universe
488 MPI : use mpiexec to launch the controller in an MPI universe
488 PBS : use PBS (qsub) to submit the controller to a batch queue
489 PBS : use PBS (qsub) to submit the controller to a batch queue
489 SGE : use SGE (qsub) to submit the controller to a batch queue
490 SGE : use SGE (qsub) to submit the controller to a batch queue
490 LSF : use LSF (bsub) to submit the controller to a batch queue
491 LSF : use LSF (bsub) to submit the controller to a batch queue
491 Condor: use HTCondor to submit the controller to a batch queue
492 Condor: use HTCondor to submit the controller to a batch queue
492 SSH : use SSH to start the controller
493 SSH : use SSH to start the controller
493 WindowsHPC : use Windows HPC
494 WindowsHPC : use Windows HPC
494
495
495 If you are using one of IPython's builtin launchers, you can specify just the
496 If you are using one of IPython's builtin launchers, you can specify just the
496 prefix, e.g:
497 prefix, e.g:
497
498
498 c.IPClusterStart.controller_launcher_class = 'SSH'
499 c.IPClusterStart.controller_launcher_class = 'SSH'
499
500
500 or:
501 or:
501
502
502 ipcluster start --controller=MPI
503 ipcluster start --controller=MPI
503
504
504 """
505 """
505 )
506 )
506 reset = Bool(False, config=True,
507 reset = Bool(False, config=True,
507 help="Whether to reset config files as part of '--create'."
508 help="Whether to reset config files as part of '--create'."
508 )
509 )
509
510
510 # flags = Dict(flags)
511 # flags = Dict(flags)
511 aliases = Dict(start_aliases)
512 aliases = Dict(start_aliases)
512
513
513 def init_launchers(self):
514 def init_launchers(self):
514 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
515 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
515 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
516 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
516
517
517 def engines_stopped(self, r):
518 def engines_stopped(self, r):
518 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
519 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
519 pass
520 pass
520
521
521 def start_controller(self):
522 def start_controller(self):
522 self.log.info("Starting Controller with %s", self.controller_launcher_class)
523 self.log.info("Starting Controller with %s", self.controller_launcher_class)
523 self.controller_launcher.on_stop(self.stop_launchers)
524 self.controller_launcher.on_stop(self.stop_launchers)
524 self.controller_launcher.start()
525 self.controller_launcher.start()
525
526
526 def stop_controller(self):
527 def stop_controller(self):
527 # self.log.info("In stop_controller")
528 # self.log.info("In stop_controller")
528 if self.controller_launcher and self.controller_launcher.running:
529 if self.controller_launcher and self.controller_launcher.running:
529 return self.controller_launcher.stop()
530 return self.controller_launcher.stop()
530
531
531 def stop_launchers(self, r=None):
532 def stop_launchers(self, r=None):
532 if not self._stopping:
533 if not self._stopping:
533 self.stop_controller()
534 self.stop_controller()
534 super(IPClusterStart, self).stop_launchers()
535 super(IPClusterStart, self).stop_launchers()
535
536
536 def start(self):
537 def start(self):
537 """Start the app for the start subcommand."""
538 """Start the app for the start subcommand."""
538 # First see if the cluster is already running
539 # First see if the cluster is already running
539 try:
540 try:
540 pid = self.get_pid_from_file()
541 pid = self.get_pid_from_file()
541 except PIDFileError:
542 except PIDFileError:
542 pass
543 pass
543 else:
544 else:
544 if self.check_pid(pid):
545 if self.check_pid(pid):
545 self.log.critical(
546 self.log.critical(
546 'Cluster is already running with [pid=%s]. '
547 'Cluster is already running with [pid=%s]. '
547 'use "ipcluster stop" to stop the cluster.' % pid
548 'use "ipcluster stop" to stop the cluster.' % pid
548 )
549 )
549 # Here I exit with a unusual exit status that other processes
550 # Here I exit with a unusual exit status that other processes
550 # can watch for to learn how I existed.
551 # can watch for to learn how I existed.
551 self.exit(ALREADY_STARTED)
552 self.exit(ALREADY_STARTED)
552 else:
553 else:
553 self.remove_pid_file()
554 self.remove_pid_file()
554
555
555
556
556 # Now log and daemonize
557 # Now log and daemonize
557 self.log.info(
558 self.log.info(
558 'Starting ipcluster with [daemon=%r]' % self.daemonize
559 'Starting ipcluster with [daemon=%r]' % self.daemonize
559 )
560 )
560 # TODO: Get daemonize working on Windows or as a Windows Server.
561 # TODO: Get daemonize working on Windows or as a Windows Server.
561 if self.daemonize:
562 if self.daemonize:
562 if os.name=='posix':
563 if os.name=='posix':
563 daemonize()
564 daemonize()
564
565
565 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
566 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
566 dc.start()
567 dc.start()
567 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
568 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
568 dc.start()
569 dc.start()
569 # Now write the new pid file AFTER our new forked pid is active.
570 # Now write the new pid file AFTER our new forked pid is active.
570 self.write_pid_file()
571 self.write_pid_file()
571 try:
572 try:
572 self.loop.start()
573 self.loop.start()
573 except KeyboardInterrupt:
574 except KeyboardInterrupt:
574 pass
575 pass
575 except zmq.ZMQError as e:
576 except zmq.ZMQError as e:
576 if e.errno == errno.EINTR:
577 if e.errno == errno.EINTR:
577 pass
578 pass
578 else:
579 else:
579 raise
580 raise
580 finally:
581 finally:
581 self.remove_pid_file()
582 self.remove_pid_file()
582
583
583 base='IPython.parallel.apps.ipclusterapp.IPCluster'
584 base='IPython.parallel.apps.ipclusterapp.IPCluster'
584
585
585 class IPClusterApp(BaseIPythonApplication):
586 class IPClusterApp(BaseIPythonApplication):
586 name = u'ipcluster'
587 name = u'ipcluster'
587 description = _description
588 description = _description
588 examples = _main_examples
589 examples = _main_examples
589
590
590 subcommands = {
591 subcommands = {
591 'start' : (base+'Start', start_help),
592 'start' : (base+'Start', start_help),
592 'stop' : (base+'Stop', stop_help),
593 'stop' : (base+'Stop', stop_help),
593 'engines' : (base+'Engines', engines_help),
594 'engines' : (base+'Engines', engines_help),
594 }
595 }
595
596
596 # no aliases or flags for parent App
597 # no aliases or flags for parent App
597 aliases = Dict()
598 aliases = Dict()
598 flags = Dict()
599 flags = Dict()
599
600
600 def start(self):
601 def start(self):
601 if self.subapp is None:
602 if self.subapp is None:
602 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
603 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
603 print
604 print
604 self.print_description()
605 self.print_description()
605 self.print_subcommands()
606 self.print_subcommands()
606 self.exit(1)
607 self.exit(1)
607 else:
608 else:
608 return self.subapp.start()
609 return self.subapp.start()
609
610
610 def launch_new_instance():
611 def launch_new_instance():
611 """Create and run the IPython cluster."""
612 """Create and run the IPython cluster."""
612 app = IPClusterApp.instance()
613 app = IPClusterApp.instance()
613 app.initialize()
614 app.initialize()
614 app.start()
615 app.start()
615
616
616
617
617 if __name__ == '__main__':
618 if __name__ == '__main__':
618 launch_new_instance()
619 launch_new_instance()
619
620
General Comments 0
You need to be logged in to leave comments. Login now