##// END OF EJS Templates
allow launcher specification by prefix alone...
MinRK -
Show More
@@ -1,525 +1,528 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag
34 from IPython.config.application import Application, boolean_flag
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.profiledir import ProfileDir
37 from IPython.core.profiledir import ProfileDir
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.sysinfo import num_cpus
40 from IPython.utils.sysinfo import num_cpus
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
42 DottedObjectName)
42 DottedObjectName)
43
43
44 from IPython.parallel.apps.baseapp import (
44 from IPython.parallel.apps.baseapp import (
45 BaseParallelApplication,
45 BaseParallelApplication,
46 PIDFileError,
46 PIDFileError,
47 base_flags, base_aliases
47 base_flags, base_aliases
48 )
48 )
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Module level variables
52 # Module level variables
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55
55
56 default_config_file_name = u'ipcluster_config.py'
56 default_config_file_name = u'ipcluster_config.py'
57
57
58
58
59 _description = """Start an IPython cluster for parallel computing.
59 _description = """Start an IPython cluster for parallel computing.
60
60
61 An IPython cluster consists of 1 controller and 1 or more engines.
61 An IPython cluster consists of 1 controller and 1 or more engines.
62 This command automates the startup of these processes using a wide
62 This command automates the startup of these processes using a wide
63 range of startup methods (SSH, local processes, PBS, mpiexec,
63 range of startup methods (SSH, local processes, PBS, mpiexec,
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 local host simply do 'ipcluster start --n=4'. For more complex usage
65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 you will typically do 'ipython profile create mycluster --parallel', then edit
66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 """
68 """
69
69
70 _main_examples = """
70 _main_examples = """
71 ipcluster start --n=4 # start a 4 node cluster on localhost
71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 ipcluster start -h # show the help string for the start subcmd
72 ipcluster start -h # show the help string for the start subcmd
73
73
74 ipcluster stop -h # show the help string for the stop subcmd
74 ipcluster stop -h # show the help string for the stop subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
76 """
76 """
77
77
78 _start_examples = """
78 _start_examples = """
79 ipython profile create mycluster --parallel # create mycluster profile
79 ipython profile create mycluster --parallel # create mycluster profile
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 """
81 """
82
82
83 _stop_examples = """
83 _stop_examples = """
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 """
85 """
86
86
87 _engines_examples = """
87 _engines_examples = """
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 """
89 """
90
90
91
91
92 # Exit codes for ipcluster
92 # Exit codes for ipcluster
93
93
94 # This will be the exit code if the ipcluster appears to be running because
94 # This will be the exit code if the ipcluster appears to be running because
95 # a .pid file exists
95 # a .pid file exists
96 ALREADY_STARTED = 10
96 ALREADY_STARTED = 10
97
97
98
98
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 # file to be found.
100 # file to be found.
101 ALREADY_STOPPED = 11
101 ALREADY_STOPPED = 11
102
102
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 # file to be found.
104 # file to be found.
105 NO_CLUSTER = 12
105 NO_CLUSTER = 12
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Main application
109 # Main application
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111 start_help = """Start an IPython cluster for parallel computing
111 start_help = """Start an IPython cluster for parallel computing
112
112
113 Start an ipython cluster by its profile name or cluster
113 Start an ipython cluster by its profile name or cluster
114 directory. Cluster directories contain configuration, log and
114 directory. Cluster directories contain configuration, log and
115 security related files and are named using the convention
115 security related files and are named using the convention
116 'profile_<name>' and should be creating using the 'start'
116 'profile_<name>' and should be creating using the 'start'
117 subcommand of 'ipcluster'. If your cluster directory is in
117 subcommand of 'ipcluster'. If your cluster directory is in
118 the cwd or the ipython directory, you can simply refer to it
118 the cwd or the ipython directory, you can simply refer to it
119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
120 otherwise use the 'profile-dir' option.
120 otherwise use the 'profile-dir' option.
121 """
121 """
122 stop_help = """Stop a running IPython cluster
122 stop_help = """Stop a running IPython cluster
123
123
124 Stop a running ipython cluster by its profile name or cluster
124 Stop a running ipython cluster by its profile name or cluster
125 directory. Cluster directories are named using the convention
125 directory. Cluster directories are named using the convention
126 'profile_<name>'. If your cluster directory is in
126 'profile_<name>'. If your cluster directory is in
127 the cwd or the ipython directory, you can simply refer to it
127 the cwd or the ipython directory, you can simply refer to it
128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
129 use the '--profile-dir' option.
129 use the '--profile-dir' option.
130 """
130 """
131 engines_help = """Start engines connected to an existing IPython cluster
131 engines_help = """Start engines connected to an existing IPython cluster
132
132
133 Start one or more engines to connect to an existing Cluster
133 Start one or more engines to connect to an existing Cluster
134 by profile name or cluster directory.
134 by profile name or cluster directory.
135 Cluster directories contain configuration, log and
135 Cluster directories contain configuration, log and
136 security related files and are named using the convention
136 security related files and are named using the convention
137 'profile_<name>' and should be creating using the 'start'
137 'profile_<name>' and should be creating using the 'start'
138 subcommand of 'ipcluster'. If your cluster directory is in
138 subcommand of 'ipcluster'. If your cluster directory is in
139 the cwd or the ipython directory, you can simply refer to it
139 the cwd or the ipython directory, you can simply refer to it
140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
141 otherwise use the 'profile-dir' option.
141 otherwise use the 'profile-dir' option.
142 """
142 """
143 stop_aliases = dict(
143 stop_aliases = dict(
144 signal='IPClusterStop.signal',
144 signal='IPClusterStop.signal',
145 )
145 )
146 stop_aliases.update(base_aliases)
146 stop_aliases.update(base_aliases)
147
147
148 class IPClusterStop(BaseParallelApplication):
148 class IPClusterStop(BaseParallelApplication):
149 name = u'ipcluster'
149 name = u'ipcluster'
150 description = stop_help
150 description = stop_help
151 examples = _stop_examples
151 examples = _stop_examples
152 config_file_name = Unicode(default_config_file_name)
152 config_file_name = Unicode(default_config_file_name)
153
153
154 signal = Int(signal.SIGINT, config=True,
154 signal = Int(signal.SIGINT, config=True,
155 help="signal to use for stopping processes.")
155 help="signal to use for stopping processes.")
156
156
157 aliases = Dict(stop_aliases)
157 aliases = Dict(stop_aliases)
158
158
159 def start(self):
159 def start(self):
160 """Start the app for the stop subcommand."""
160 """Start the app for the stop subcommand."""
161 try:
161 try:
162 pid = self.get_pid_from_file()
162 pid = self.get_pid_from_file()
163 except PIDFileError:
163 except PIDFileError:
164 self.log.critical(
164 self.log.critical(
165 'Could not read pid file, cluster is probably not running.'
165 'Could not read pid file, cluster is probably not running.'
166 )
166 )
167 # Here I exit with a unusual exit status that other processes
167 # Here I exit with a unusual exit status that other processes
168 # can watch for to learn how I existed.
168 # can watch for to learn how I existed.
169 self.remove_pid_file()
169 self.remove_pid_file()
170 self.exit(ALREADY_STOPPED)
170 self.exit(ALREADY_STOPPED)
171
171
172 if not self.check_pid(pid):
172 if not self.check_pid(pid):
173 self.log.critical(
173 self.log.critical(
174 'Cluster [pid=%r] is not running.' % pid
174 'Cluster [pid=%r] is not running.' % pid
175 )
175 )
176 self.remove_pid_file()
176 self.remove_pid_file()
177 # Here I exit with a unusual exit status that other processes
177 # Here I exit with a unusual exit status that other processes
178 # can watch for to learn how I existed.
178 # can watch for to learn how I existed.
179 self.exit(ALREADY_STOPPED)
179 self.exit(ALREADY_STOPPED)
180
180
181 elif os.name=='posix':
181 elif os.name=='posix':
182 sig = self.signal
182 sig = self.signal
183 self.log.info(
183 self.log.info(
184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
185 )
185 )
186 try:
186 try:
187 os.kill(pid, sig)
187 os.kill(pid, sig)
188 except OSError:
188 except OSError:
189 self.log.error("Stopping cluster failed, assuming already dead.",
189 self.log.error("Stopping cluster failed, assuming already dead.",
190 exc_info=True)
190 exc_info=True)
191 self.remove_pid_file()
191 self.remove_pid_file()
192 elif os.name=='nt':
192 elif os.name=='nt':
193 try:
193 try:
194 # kill the whole tree
194 # kill the whole tree
195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
196 except (CalledProcessError, OSError):
196 except (CalledProcessError, OSError):
197 self.log.error("Stopping cluster failed, assuming already dead.",
197 self.log.error("Stopping cluster failed, assuming already dead.",
198 exc_info=True)
198 exc_info=True)
199 self.remove_pid_file()
199 self.remove_pid_file()
200
200
201 engine_aliases = {}
201 engine_aliases = {}
202 engine_aliases.update(base_aliases)
202 engine_aliases.update(base_aliases)
203 engine_aliases.update(dict(
203 engine_aliases.update(dict(
204 n='IPClusterEngines.n',
204 n='IPClusterEngines.n',
205 engines = 'IPClusterEngines.engine_launcher_class',
205 engines = 'IPClusterEngines.engine_launcher_class',
206 daemonize = 'IPClusterEngines.daemonize',
206 daemonize = 'IPClusterEngines.daemonize',
207 ))
207 ))
208 engine_flags = {}
208 engine_flags = {}
209 engine_flags.update(base_flags)
209 engine_flags.update(base_flags)
210
210
211 engine_flags.update(dict(
211 engine_flags.update(dict(
212 daemonize=(
212 daemonize=(
213 {'IPClusterEngines' : {'daemonize' : True}},
213 {'IPClusterEngines' : {'daemonize' : True}},
214 """run the cluster into the background (not available on Windows)""",
214 """run the cluster into the background (not available on Windows)""",
215 )
215 )
216 ))
216 ))
217 class IPClusterEngines(BaseParallelApplication):
217 class IPClusterEngines(BaseParallelApplication):
218
218
219 name = u'ipcluster'
219 name = u'ipcluster'
220 description = engines_help
220 description = engines_help
221 examples = _engines_examples
221 examples = _engines_examples
222 usage = None
222 usage = None
223 config_file_name = Unicode(default_config_file_name)
223 config_file_name = Unicode(default_config_file_name)
224 default_log_level = logging.INFO
224 default_log_level = logging.INFO
225 classes = List()
225 classes = List()
226 def _classes_default(self):
226 def _classes_default(self):
227 from IPython.parallel.apps import launcher
227 from IPython.parallel.apps import launcher
228 launchers = launcher.all_launchers
228 launchers = launcher.all_launchers
229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
230 return [ProfileDir]+eslaunchers
230 return [ProfileDir]+eslaunchers
231
231
232 n = Int(num_cpus(), config=True,
232 n = Int(num_cpus(), config=True,
233 help="""The number of engines to start. The default is to use one for each
233 help="""The number of engines to start. The default is to use one for each
234 CPU on your machine""")
234 CPU on your machine""")
235
235
236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
237 config=True,
237 config=True,
238 help="""The class for launching a set of Engines. Change this value
238 help="""The class for launching a set of Engines. Change this value
239 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
239 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
240 Each launcher class has its own set of configuration options, for making sure
240 Each launcher class has its own set of configuration options, for making sure
241 it will work in your environment.
241 it will work in your environment.
242
242
243 You can also write your own launcher, and specify it's absolute import path,
243 You can also write your own launcher, and specify it's absolute import path,
244 as in 'mymodule.launcher.FTLEnginesLauncher`.
244 as in 'mymodule.launcher.FTLEnginesLauncher`.
245
245
246 Examples include:
246 Examples include:
247
247
248 LocalEngineSetLauncher : start engines locally as subprocesses [default]
248 LocalEngineSetLauncher : start engines locally as subprocesses [default]
249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
252 SSHEngineSetLauncher : use SSH to start the controller
252 SSHEngineSetLauncher : use SSH to start the controller
253 Note that SSH does *not* move the connection files
253 Note that SSH does *not* move the connection files
254 around, so you will likely have to do this manually
254 around, so you will likely have to do this manually
255 unless the machines are on a shared file system.
255 unless the machines are on a shared file system.
256 WindowsHPCEngineSetLauncher : use Windows HPC
256 WindowsHPCEngineSetLauncher : use Windows HPC
257 """
257 """
258 )
258 )
259 daemonize = Bool(False, config=True,
259 daemonize = Bool(False, config=True,
260 help="""Daemonize the ipcluster program. This implies --log-to-file.
260 help="""Daemonize the ipcluster program. This implies --log-to-file.
261 Not available on Windows.
261 Not available on Windows.
262 """)
262 """)
263
263
264 def _daemonize_changed(self, name, old, new):
264 def _daemonize_changed(self, name, old, new):
265 if new:
265 if new:
266 self.log_to_file = True
266 self.log_to_file = True
267
267
268 aliases = Dict(engine_aliases)
268 aliases = Dict(engine_aliases)
269 flags = Dict(engine_flags)
269 flags = Dict(engine_flags)
270 _stopping = False
270 _stopping = False
271
271
272 def initialize(self, argv=None):
272 def initialize(self, argv=None):
273 super(IPClusterEngines, self).initialize(argv)
273 super(IPClusterEngines, self).initialize(argv)
274 self.init_signal()
274 self.init_signal()
275 self.init_launchers()
275 self.init_launchers()
276
276
277 def init_launchers(self):
277 def init_launchers(self):
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
280
280
281 def init_signal(self):
281 def init_signal(self):
282 # Setup signals
282 # Setup signals
283 signal.signal(signal.SIGINT, self.sigint_handler)
283 signal.signal(signal.SIGINT, self.sigint_handler)
284
284
285 def build_launcher(self, clsname):
285 def build_launcher(self, clsname, kind=None):
286 """import and instantiate a Launcher based on importstring"""
286 """import and instantiate a Launcher based on importstring"""
287 if '.' not in clsname:
287 if '.' not in clsname:
288 # not a module, presume it's the raw name in apps.launcher
288 # not a module, presume it's the raw name in apps.launcher
289 if kind and kind not in clsname:
290 # doesn't match necessary full class name, assume it's
291 # just 'PBS' or 'MPIExec' prefix:
292 clsname = clsname + kind + 'Launcher'
289 clsname = 'IPython.parallel.apps.launcher.'+clsname
293 clsname = 'IPython.parallel.apps.launcher.'+clsname
290 # print repr(clsname)
291 try:
294 try:
292 klass = import_item(clsname)
295 klass = import_item(clsname)
293 except (ImportError, KeyError):
296 except (ImportError, KeyError):
294 self.log.fatal("Could not import launcher class: %r"%clsname)
297 self.log.fatal("Could not import launcher class: %r"%clsname)
295 self.exit(1)
298 self.exit(1)
296
299
297 launcher = klass(
300 launcher = klass(
298 work_dir=u'.', config=self.config, log=self.log,
301 work_dir=u'.', config=self.config, log=self.log,
299 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
302 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
300 )
303 )
301 return launcher
304 return launcher
302
305
303 def start_engines(self):
306 def start_engines(self):
304 self.log.info("Starting %i engines"%self.n)
307 self.log.info("Starting %i engines"%self.n)
305 self.engine_launcher.start(self.n)
308 self.engine_launcher.start(self.n)
306
309
307 def stop_engines(self):
310 def stop_engines(self):
308 self.log.info("Stopping Engines...")
311 self.log.info("Stopping Engines...")
309 if self.engine_launcher.running:
312 if self.engine_launcher.running:
310 d = self.engine_launcher.stop()
313 d = self.engine_launcher.stop()
311 return d
314 return d
312 else:
315 else:
313 return None
316 return None
314
317
315 def stop_launchers(self, r=None):
318 def stop_launchers(self, r=None):
316 if not self._stopping:
319 if not self._stopping:
317 self._stopping = True
320 self._stopping = True
318 self.log.error("IPython cluster: stopping")
321 self.log.error("IPython cluster: stopping")
319 self.stop_engines()
322 self.stop_engines()
320 # Wait a few seconds to let things shut down.
323 # Wait a few seconds to let things shut down.
321 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
324 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
322 dc.start()
325 dc.start()
323
326
324 def sigint_handler(self, signum, frame):
327 def sigint_handler(self, signum, frame):
325 self.log.debug("SIGINT received, stopping launchers...")
328 self.log.debug("SIGINT received, stopping launchers...")
326 self.stop_launchers()
329 self.stop_launchers()
327
330
328 def start_logging(self):
331 def start_logging(self):
329 # Remove old log files of the controller and engine
332 # Remove old log files of the controller and engine
330 if self.clean_logs:
333 if self.clean_logs:
331 log_dir = self.profile_dir.log_dir
334 log_dir = self.profile_dir.log_dir
332 for f in os.listdir(log_dir):
335 for f in os.listdir(log_dir):
333 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
336 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
334 os.remove(os.path.join(log_dir, f))
337 os.remove(os.path.join(log_dir, f))
335 # This will remove old log files for ipcluster itself
338 # This will remove old log files for ipcluster itself
336 # super(IPBaseParallelApplication, self).start_logging()
339 # super(IPBaseParallelApplication, self).start_logging()
337
340
338 def start(self):
341 def start(self):
339 """Start the app for the engines subcommand."""
342 """Start the app for the engines subcommand."""
340 self.log.info("IPython cluster: started")
343 self.log.info("IPython cluster: started")
341 # First see if the cluster is already running
344 # First see if the cluster is already running
342
345
343 # Now log and daemonize
346 # Now log and daemonize
344 self.log.info(
347 self.log.info(
345 'Starting engines with [daemon=%r]' % self.daemonize
348 'Starting engines with [daemon=%r]' % self.daemonize
346 )
349 )
347 # TODO: Get daemonize working on Windows or as a Windows Server.
350 # TODO: Get daemonize working on Windows or as a Windows Server.
348 if self.daemonize:
351 if self.daemonize:
349 if os.name=='posix':
352 if os.name=='posix':
350 daemonize()
353 daemonize()
351
354
352 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
355 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
353 dc.start()
356 dc.start()
354 # Now write the new pid file AFTER our new forked pid is active.
357 # Now write the new pid file AFTER our new forked pid is active.
355 # self.write_pid_file()
358 # self.write_pid_file()
356 try:
359 try:
357 self.loop.start()
360 self.loop.start()
358 except KeyboardInterrupt:
361 except KeyboardInterrupt:
359 pass
362 pass
360 except zmq.ZMQError as e:
363 except zmq.ZMQError as e:
361 if e.errno == errno.EINTR:
364 if e.errno == errno.EINTR:
362 pass
365 pass
363 else:
366 else:
364 raise
367 raise
365
368
366 start_aliases = {}
369 start_aliases = {}
367 start_aliases.update(engine_aliases)
370 start_aliases.update(engine_aliases)
368 start_aliases.update(dict(
371 start_aliases.update(dict(
369 delay='IPClusterStart.delay',
372 delay='IPClusterStart.delay',
370 controller = 'IPClusterStart.controller_launcher_class',
373 controller = 'IPClusterStart.controller_launcher_class',
371 ))
374 ))
372 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
375 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
373
376
374 # set inherited Start keys directly, to ensure command-line args get higher priority
377 # set inherited Start keys directly, to ensure command-line args get higher priority
375 # than config file options.
378 # than config file options.
376 for key,value in start_aliases.items():
379 for key,value in start_aliases.items():
377 if value.startswith('IPClusterEngines'):
380 if value.startswith('IPClusterEngines'):
378 start_aliases[key] = value.replace('IPClusterEngines', 'IPClusterStart')
381 start_aliases[key] = value.replace('IPClusterEngines', 'IPClusterStart')
379
382
380 class IPClusterStart(IPClusterEngines):
383 class IPClusterStart(IPClusterEngines):
381
384
382 name = u'ipcluster'
385 name = u'ipcluster'
383 description = start_help
386 description = start_help
384 examples = _start_examples
387 examples = _start_examples
385 default_log_level = logging.INFO
388 default_log_level = logging.INFO
386 auto_create = Bool(True, config=True,
389 auto_create = Bool(True, config=True,
387 help="whether to create the profile_dir if it doesn't exist")
390 help="whether to create the profile_dir if it doesn't exist")
388 classes = List()
391 classes = List()
389 def _classes_default(self,):
392 def _classes_default(self,):
390 from IPython.parallel.apps import launcher
393 from IPython.parallel.apps import launcher
391 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
394 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
392
395
393 clean_logs = Bool(True, config=True,
396 clean_logs = Bool(True, config=True,
394 help="whether to cleanup old logs before starting")
397 help="whether to cleanup old logs before starting")
395
398
396 delay = CFloat(1., config=True,
399 delay = CFloat(1., config=True,
397 help="delay (in s) between starting the controller and the engines")
400 help="delay (in s) between starting the controller and the engines")
398
401
399 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
402 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
400 config=True,
403 config=True,
401 helep="""The class for launching a Controller. Change this value if you want
404 helep="""The class for launching a Controller. Change this value if you want
402 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
405 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
403
406
404 Each launcher class has its own set of configuration options, for making sure
407 Each launcher class has its own set of configuration options, for making sure
405 it will work in your environment.
408 it will work in your environment.
406
409
407 Examples include:
410 Examples include:
408
411
409 LocalControllerLauncher : start engines locally as subprocesses
412 LocalControllerLauncher : start engines locally as subprocesses
410 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
413 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
411 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
414 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
412 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
415 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
413 SSHControllerLauncher : use SSH to start the controller
416 SSHControllerLauncher : use SSH to start the controller
414 WindowsHPCControllerLauncher : use Windows HPC
417 WindowsHPCControllerLauncher : use Windows HPC
415 """
418 """
416 )
419 )
417 reset = Bool(False, config=True,
420 reset = Bool(False, config=True,
418 help="Whether to reset config files as part of '--create'."
421 help="Whether to reset config files as part of '--create'."
419 )
422 )
420
423
421 # flags = Dict(flags)
424 # flags = Dict(flags)
422 aliases = Dict(start_aliases)
425 aliases = Dict(start_aliases)
423
426
424 def init_launchers(self):
427 def init_launchers(self):
425 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
428 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
426 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
429 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
427 self.controller_launcher.on_stop(self.stop_launchers)
430 self.controller_launcher.on_stop(self.stop_launchers)
428
431
429 def start_controller(self):
432 def start_controller(self):
430 self.controller_launcher.start()
433 self.controller_launcher.start()
431
434
432 def stop_controller(self):
435 def stop_controller(self):
433 # self.log.info("In stop_controller")
436 # self.log.info("In stop_controller")
434 if self.controller_launcher and self.controller_launcher.running:
437 if self.controller_launcher and self.controller_launcher.running:
435 return self.controller_launcher.stop()
438 return self.controller_launcher.stop()
436
439
437 def stop_launchers(self, r=None):
440 def stop_launchers(self, r=None):
438 if not self._stopping:
441 if not self._stopping:
439 self.stop_controller()
442 self.stop_controller()
440 super(IPClusterStart, self).stop_launchers()
443 super(IPClusterStart, self).stop_launchers()
441
444
442 def start(self):
445 def start(self):
443 """Start the app for the start subcommand."""
446 """Start the app for the start subcommand."""
444 # First see if the cluster is already running
447 # First see if the cluster is already running
445 try:
448 try:
446 pid = self.get_pid_from_file()
449 pid = self.get_pid_from_file()
447 except PIDFileError:
450 except PIDFileError:
448 pass
451 pass
449 else:
452 else:
450 if self.check_pid(pid):
453 if self.check_pid(pid):
451 self.log.critical(
454 self.log.critical(
452 'Cluster is already running with [pid=%s]. '
455 'Cluster is already running with [pid=%s]. '
453 'use "ipcluster stop" to stop the cluster.' % pid
456 'use "ipcluster stop" to stop the cluster.' % pid
454 )
457 )
455 # Here I exit with a unusual exit status that other processes
458 # Here I exit with a unusual exit status that other processes
456 # can watch for to learn how I existed.
459 # can watch for to learn how I existed.
457 self.exit(ALREADY_STARTED)
460 self.exit(ALREADY_STARTED)
458 else:
461 else:
459 self.remove_pid_file()
462 self.remove_pid_file()
460
463
461
464
462 # Now log and daemonize
465 # Now log and daemonize
463 self.log.info(
466 self.log.info(
464 'Starting ipcluster with [daemon=%r]' % self.daemonize
467 'Starting ipcluster with [daemon=%r]' % self.daemonize
465 )
468 )
466 # TODO: Get daemonize working on Windows or as a Windows Server.
469 # TODO: Get daemonize working on Windows or as a Windows Server.
467 if self.daemonize:
470 if self.daemonize:
468 if os.name=='posix':
471 if os.name=='posix':
469 daemonize()
472 daemonize()
470
473
471 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
474 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
472 dc.start()
475 dc.start()
473 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
476 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
474 dc.start()
477 dc.start()
475 # Now write the new pid file AFTER our new forked pid is active.
478 # Now write the new pid file AFTER our new forked pid is active.
476 self.write_pid_file()
479 self.write_pid_file()
477 try:
480 try:
478 self.loop.start()
481 self.loop.start()
479 except KeyboardInterrupt:
482 except KeyboardInterrupt:
480 pass
483 pass
481 except zmq.ZMQError as e:
484 except zmq.ZMQError as e:
482 if e.errno == errno.EINTR:
485 if e.errno == errno.EINTR:
483 pass
486 pass
484 else:
487 else:
485 raise
488 raise
486 finally:
489 finally:
487 self.remove_pid_file()
490 self.remove_pid_file()
488
491
489 base='IPython.parallel.apps.ipclusterapp.IPCluster'
492 base='IPython.parallel.apps.ipclusterapp.IPCluster'
490
493
491 class IPClusterApp(Application):
494 class IPClusterApp(Application):
492 name = u'ipcluster'
495 name = u'ipcluster'
493 description = _description
496 description = _description
494 examples = _main_examples
497 examples = _main_examples
495
498
496 subcommands = {
499 subcommands = {
497 'start' : (base+'Start', start_help),
500 'start' : (base+'Start', start_help),
498 'stop' : (base+'Stop', stop_help),
501 'stop' : (base+'Stop', stop_help),
499 'engines' : (base+'Engines', engines_help),
502 'engines' : (base+'Engines', engines_help),
500 }
503 }
501
504
502 # no aliases or flags for parent App
505 # no aliases or flags for parent App
503 aliases = Dict()
506 aliases = Dict()
504 flags = Dict()
507 flags = Dict()
505
508
506 def start(self):
509 def start(self):
507 if self.subapp is None:
510 if self.subapp is None:
508 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
511 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
509 print
512 print
510 self.print_description()
513 self.print_description()
511 self.print_subcommands()
514 self.print_subcommands()
512 self.exit(1)
515 self.exit(1)
513 else:
516 else:
514 return self.subapp.start()
517 return self.subapp.start()
515
518
516 def launch_new_instance():
519 def launch_new_instance():
517 """Create and run the IPython cluster."""
520 """Create and run the IPython cluster."""
518 app = IPClusterApp.instance()
521 app = IPClusterApp.instance()
519 app.initialize()
522 app.initialize()
520 app.start()
523 app.start()
521
524
522
525
523 if __name__ == '__main__':
526 if __name__ == '__main__':
524 launch_new_instance()
527 launch_new_instance()
525
528
General Comments 0
You need to be logged in to leave comments. Login now