##// END OF EJS Templates
fix typo in ipcluster help
MinRK -
Show More
@@ -1,529 +1,529 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag
34 from IPython.config.application import Application, boolean_flag
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.profiledir import ProfileDir
37 from IPython.core.profiledir import ProfileDir
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.sysinfo import num_cpus
40 from IPython.utils.sysinfo import num_cpus
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
42 DottedObjectName)
42 DottedObjectName)
43
43
44 from IPython.parallel.apps.baseapp import (
44 from IPython.parallel.apps.baseapp import (
45 BaseParallelApplication,
45 BaseParallelApplication,
46 PIDFileError,
46 PIDFileError,
47 base_flags, base_aliases
47 base_flags, base_aliases
48 )
48 )
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Module level variables
52 # Module level variables
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55
55
56 default_config_file_name = u'ipcluster_config.py'
56 default_config_file_name = u'ipcluster_config.py'
57
57
58
58
59 _description = """Start an IPython cluster for parallel computing.
59 _description = """Start an IPython cluster for parallel computing.
60
60
61 An IPython cluster consists of 1 controller and 1 or more engines.
61 An IPython cluster consists of 1 controller and 1 or more engines.
62 This command automates the startup of these processes using a wide
62 This command automates the startup of these processes using a wide
63 range of startup methods (SSH, local processes, PBS, mpiexec,
63 range of startup methods (SSH, local processes, PBS, mpiexec,
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 local host simply do 'ipcluster start --n=4'. For more complex usage
65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 you will typically do 'ipython create mycluster --parallel', then edit
66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 """
68 """
69
69
70 _main_examples = """
70 _main_examples = """
71 ipcluster start --n=4 # start a 4 node cluster on localhost
71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 ipcluster start -h # show the help string for the start subcmd
72 ipcluster start -h # show the help string for the start subcmd
73
73
74 ipcluster stop -h # show the help string for the stop subcmd
74 ipcluster stop -h # show the help string for the stop subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
76 """
76 """
77
77
78 _start_examples = """
78 _start_examples = """
79 ipython profile create mycluster --parallel # create mycluster profile
79 ipython profile create mycluster --parallel # create mycluster profile
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 """
81 """
82
82
83 _stop_examples = """
83 _stop_examples = """
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 """
85 """
86
86
87 _engines_examples = """
87 _engines_examples = """
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 """
89 """
90
90
91
91
92 # Exit codes for ipcluster
92 # Exit codes for ipcluster
93
93
94 # This will be the exit code if the ipcluster appears to be running because
94 # This will be the exit code if the ipcluster appears to be running because
95 # a .pid file exists
95 # a .pid file exists
96 ALREADY_STARTED = 10
96 ALREADY_STARTED = 10
97
97
98
98
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 # file to be found.
100 # file to be found.
101 ALREADY_STOPPED = 11
101 ALREADY_STOPPED = 11
102
102
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 # file to be found.
104 # file to be found.
105 NO_CLUSTER = 12
105 NO_CLUSTER = 12
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Main application
109 # Main application
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111 start_help = """Start an IPython cluster for parallel computing
111 start_help = """Start an IPython cluster for parallel computing
112
112
113 Start an ipython cluster by its profile name or cluster
113 Start an ipython cluster by its profile name or cluster
114 directory. Cluster directories contain configuration, log and
114 directory. Cluster directories contain configuration, log and
115 security related files and are named using the convention
115 security related files and are named using the convention
116 'profile_<name>' and should be creating using the 'start'
116 'profile_<name>' and should be creating using the 'start'
117 subcommand of 'ipcluster'. If your cluster directory is in
117 subcommand of 'ipcluster'. If your cluster directory is in
118 the cwd or the ipython directory, you can simply refer to it
118 the cwd or the ipython directory, you can simply refer to it
119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
120 otherwise use the 'profile-dir' option.
120 otherwise use the 'profile-dir' option.
121 """
121 """
122 stop_help = """Stop a running IPython cluster
122 stop_help = """Stop a running IPython cluster
123
123
124 Stop a running ipython cluster by its profile name or cluster
124 Stop a running ipython cluster by its profile name or cluster
125 directory. Cluster directories are named using the convention
125 directory. Cluster directories are named using the convention
126 'profile_<name>'. If your cluster directory is in
126 'profile_<name>'. If your cluster directory is in
127 the cwd or the ipython directory, you can simply refer to it
127 the cwd or the ipython directory, you can simply refer to it
128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
129 use the '--profile-dir' option.
129 use the '--profile-dir' option.
130 """
130 """
131 engines_help = """Start engines connected to an existing IPython cluster
131 engines_help = """Start engines connected to an existing IPython cluster
132
132
133 Start one or more engines to connect to an existing Cluster
133 Start one or more engines to connect to an existing Cluster
134 by profile name or cluster directory.
134 by profile name or cluster directory.
135 Cluster directories contain configuration, log and
135 Cluster directories contain configuration, log and
136 security related files and are named using the convention
136 security related files and are named using the convention
137 'profile_<name>' and should be creating using the 'start'
137 'profile_<name>' and should be creating using the 'start'
138 subcommand of 'ipcluster'. If your cluster directory is in
138 subcommand of 'ipcluster'. If your cluster directory is in
139 the cwd or the ipython directory, you can simply refer to it
139 the cwd or the ipython directory, you can simply refer to it
140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
141 otherwise use the 'profile-dir' option.
141 otherwise use the 'profile-dir' option.
142 """
142 """
143 stop_aliases = dict(
143 stop_aliases = dict(
144 signal='IPClusterStop.signal',
144 signal='IPClusterStop.signal',
145 )
145 )
146 stop_aliases.update(base_aliases)
146 stop_aliases.update(base_aliases)
147
147
148 class IPClusterStop(BaseParallelApplication):
148 class IPClusterStop(BaseParallelApplication):
149 name = u'ipcluster'
149 name = u'ipcluster'
150 description = stop_help
150 description = stop_help
151 examples = _stop_examples
151 examples = _stop_examples
152 config_file_name = Unicode(default_config_file_name)
152 config_file_name = Unicode(default_config_file_name)
153
153
154 signal = Int(signal.SIGINT, config=True,
154 signal = Int(signal.SIGINT, config=True,
155 help="signal to use for stopping processes.")
155 help="signal to use for stopping processes.")
156
156
157 aliases = Dict(stop_aliases)
157 aliases = Dict(stop_aliases)
158
158
159 def start(self):
159 def start(self):
160 """Start the app for the stop subcommand."""
160 """Start the app for the stop subcommand."""
161 try:
161 try:
162 pid = self.get_pid_from_file()
162 pid = self.get_pid_from_file()
163 except PIDFileError:
163 except PIDFileError:
164 self.log.critical(
164 self.log.critical(
165 'Could not read pid file, cluster is probably not running.'
165 'Could not read pid file, cluster is probably not running.'
166 )
166 )
167 # Here I exit with a unusual exit status that other processes
167 # Here I exit with a unusual exit status that other processes
168 # can watch for to learn how I existed.
168 # can watch for to learn how I existed.
169 self.remove_pid_file()
169 self.remove_pid_file()
170 self.exit(ALREADY_STOPPED)
170 self.exit(ALREADY_STOPPED)
171
171
172 if not self.check_pid(pid):
172 if not self.check_pid(pid):
173 self.log.critical(
173 self.log.critical(
174 'Cluster [pid=%r] is not running.' % pid
174 'Cluster [pid=%r] is not running.' % pid
175 )
175 )
176 self.remove_pid_file()
176 self.remove_pid_file()
177 # Here I exit with a unusual exit status that other processes
177 # Here I exit with a unusual exit status that other processes
178 # can watch for to learn how I existed.
178 # can watch for to learn how I existed.
179 self.exit(ALREADY_STOPPED)
179 self.exit(ALREADY_STOPPED)
180
180
181 elif os.name=='posix':
181 elif os.name=='posix':
182 sig = self.signal
182 sig = self.signal
183 self.log.info(
183 self.log.info(
184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
185 )
185 )
186 try:
186 try:
187 os.kill(pid, sig)
187 os.kill(pid, sig)
188 except OSError:
188 except OSError:
189 self.log.error("Stopping cluster failed, assuming already dead.",
189 self.log.error("Stopping cluster failed, assuming already dead.",
190 exc_info=True)
190 exc_info=True)
191 self.remove_pid_file()
191 self.remove_pid_file()
192 elif os.name=='nt':
192 elif os.name=='nt':
193 try:
193 try:
194 # kill the whole tree
194 # kill the whole tree
195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
196 except (CalledProcessError, OSError):
196 except (CalledProcessError, OSError):
197 self.log.error("Stopping cluster failed, assuming already dead.",
197 self.log.error("Stopping cluster failed, assuming already dead.",
198 exc_info=True)
198 exc_info=True)
199 self.remove_pid_file()
199 self.remove_pid_file()
200
200
201 engine_aliases = {}
201 engine_aliases = {}
202 engine_aliases.update(base_aliases)
202 engine_aliases.update(base_aliases)
203 engine_aliases.update(dict(
203 engine_aliases.update(dict(
204 n='IPClusterEngines.n',
204 n='IPClusterEngines.n',
205 engines = 'IPClusterEngines.engine_launcher_class',
205 engines = 'IPClusterEngines.engine_launcher_class',
206 daemonize = 'IPClusterEngines.daemonize',
206 daemonize = 'IPClusterEngines.daemonize',
207 ))
207 ))
208 engine_flags = {}
208 engine_flags = {}
209 engine_flags.update(base_flags)
209 engine_flags.update(base_flags)
210
210
211 engine_flags.update(dict(
211 engine_flags.update(dict(
212 daemonize=(
212 daemonize=(
213 {'IPClusterEngines' : {'daemonize' : True}},
213 {'IPClusterEngines' : {'daemonize' : True}},
214 """run the cluster into the background (not available on Windows)""",
214 """run the cluster into the background (not available on Windows)""",
215 )
215 )
216 ))
216 ))
217 class IPClusterEngines(BaseParallelApplication):
217 class IPClusterEngines(BaseParallelApplication):
218
218
219 name = u'ipcluster'
219 name = u'ipcluster'
220 description = engines_help
220 description = engines_help
221 examples = _engines_examples
221 examples = _engines_examples
222 usage = None
222 usage = None
223 config_file_name = Unicode(default_config_file_name)
223 config_file_name = Unicode(default_config_file_name)
224 default_log_level = logging.INFO
224 default_log_level = logging.INFO
225 classes = List()
225 classes = List()
226 def _classes_default(self):
226 def _classes_default(self):
227 from IPython.parallel.apps import launcher
227 from IPython.parallel.apps import launcher
228 launchers = launcher.all_launchers
228 launchers = launcher.all_launchers
229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
230 return [ProfileDir]+eslaunchers
230 return [ProfileDir]+eslaunchers
231
231
232 n = Int(num_cpus(), config=True,
232 n = Int(num_cpus(), config=True,
233 help="""The number of engines to start. The default is to use one for each
233 help="""The number of engines to start. The default is to use one for each
234 CPU on your machine""")
234 CPU on your machine""")
235
235
236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
237 config=True,
237 config=True,
238 help="""The class for launching a set of Engines. Change this value
238 help="""The class for launching a set of Engines. Change this value
239 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
239 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
240 Each launcher class has its own set of configuration options, for making sure
240 Each launcher class has its own set of configuration options, for making sure
241 it will work in your environment.
241 it will work in your environment.
242
242
243 You can also write your own launcher, and specify it's absolute import path,
243 You can also write your own launcher, and specify it's absolute import path,
244 as in 'mymodule.launcher.FTLEnginesLauncher`.
244 as in 'mymodule.launcher.FTLEnginesLauncher`.
245
245
246 Examples include:
246 Examples include:
247
247
248 LocalEngineSetLauncher : start engines locally as subprocesses [default]
248 LocalEngineSetLauncher : start engines locally as subprocesses [default]
249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
252 SSHEngineSetLauncher : use SSH to start the controller
252 SSHEngineSetLauncher : use SSH to start the controller
253 Note that SSH does *not* move the connection files
253 Note that SSH does *not* move the connection files
254 around, so you will likely have to do this manually
254 around, so you will likely have to do this manually
255 unless the machines are on a shared file system.
255 unless the machines are on a shared file system.
256 WindowsHPCEngineSetLauncher : use Windows HPC
256 WindowsHPCEngineSetLauncher : use Windows HPC
257 """
257 """
258 )
258 )
259 daemonize = Bool(False, config=True,
259 daemonize = Bool(False, config=True,
260 help="""Daemonize the ipcluster program. This implies --log-to-file.
260 help="""Daemonize the ipcluster program. This implies --log-to-file.
261 Not available on Windows.
261 Not available on Windows.
262 """)
262 """)
263
263
264 def _daemonize_changed(self, name, old, new):
264 def _daemonize_changed(self, name, old, new):
265 if new:
265 if new:
266 self.log_to_file = True
266 self.log_to_file = True
267
267
268 aliases = Dict(engine_aliases)
268 aliases = Dict(engine_aliases)
269 flags = Dict(engine_flags)
269 flags = Dict(engine_flags)
270 _stopping = False
270 _stopping = False
271
271
272 def initialize(self, argv=None):
272 def initialize(self, argv=None):
273 super(IPClusterEngines, self).initialize(argv)
273 super(IPClusterEngines, self).initialize(argv)
274 self.init_signal()
274 self.init_signal()
275 self.init_launchers()
275 self.init_launchers()
276
276
277 def init_launchers(self):
277 def init_launchers(self):
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
280
280
281 def init_signal(self):
281 def init_signal(self):
282 # Setup signals
282 # Setup signals
283 signal.signal(signal.SIGINT, self.sigint_handler)
283 signal.signal(signal.SIGINT, self.sigint_handler)
284
284
285 def build_launcher(self, clsname):
285 def build_launcher(self, clsname):
286 """import and instantiate a Launcher based on importstring"""
286 """import and instantiate a Launcher based on importstring"""
287 if '.' not in clsname:
287 if '.' not in clsname:
288 # not a module, presume it's the raw name in apps.launcher
288 # not a module, presume it's the raw name in apps.launcher
289 clsname = 'IPython.parallel.apps.launcher.'+clsname
289 clsname = 'IPython.parallel.apps.launcher.'+clsname
290 # print repr(clsname)
290 # print repr(clsname)
291 try:
291 try:
292 klass = import_item(clsname)
292 klass = import_item(clsname)
293 except (ImportError, KeyError):
293 except (ImportError, KeyError):
294 self.log.fatal("Could not import launcher class: %r"%clsname)
294 self.log.fatal("Could not import launcher class: %r"%clsname)
295 self.exit(1)
295 self.exit(1)
296
296
297 launcher = klass(
297 launcher = klass(
298 work_dir=u'.', config=self.config, log=self.log
298 work_dir=u'.', config=self.config, log=self.log
299 )
299 )
300 return launcher
300 return launcher
301
301
302 def start_engines(self):
302 def start_engines(self):
303 self.log.info("Starting %i engines"%self.n)
303 self.log.info("Starting %i engines"%self.n)
304 self.engine_launcher.start(
304 self.engine_launcher.start(
305 self.n,
305 self.n,
306 self.profile_dir.location
306 self.profile_dir.location
307 )
307 )
308
308
309 def stop_engines(self):
309 def stop_engines(self):
310 self.log.info("Stopping Engines...")
310 self.log.info("Stopping Engines...")
311 if self.engine_launcher.running:
311 if self.engine_launcher.running:
312 d = self.engine_launcher.stop()
312 d = self.engine_launcher.stop()
313 return d
313 return d
314 else:
314 else:
315 return None
315 return None
316
316
317 def stop_launchers(self, r=None):
317 def stop_launchers(self, r=None):
318 if not self._stopping:
318 if not self._stopping:
319 self._stopping = True
319 self._stopping = True
320 self.log.error("IPython cluster: stopping")
320 self.log.error("IPython cluster: stopping")
321 self.stop_engines()
321 self.stop_engines()
322 # Wait a few seconds to let things shut down.
322 # Wait a few seconds to let things shut down.
323 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
323 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
324 dc.start()
324 dc.start()
325
325
326 def sigint_handler(self, signum, frame):
326 def sigint_handler(self, signum, frame):
327 self.log.debug("SIGINT received, stopping launchers...")
327 self.log.debug("SIGINT received, stopping launchers...")
328 self.stop_launchers()
328 self.stop_launchers()
329
329
330 def start_logging(self):
330 def start_logging(self):
331 # Remove old log files of the controller and engine
331 # Remove old log files of the controller and engine
332 if self.clean_logs:
332 if self.clean_logs:
333 log_dir = self.profile_dir.log_dir
333 log_dir = self.profile_dir.log_dir
334 for f in os.listdir(log_dir):
334 for f in os.listdir(log_dir):
335 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
335 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
336 os.remove(os.path.join(log_dir, f))
336 os.remove(os.path.join(log_dir, f))
337 # This will remove old log files for ipcluster itself
337 # This will remove old log files for ipcluster itself
338 # super(IPBaseParallelApplication, self).start_logging()
338 # super(IPBaseParallelApplication, self).start_logging()
339
339
340 def start(self):
340 def start(self):
341 """Start the app for the engines subcommand."""
341 """Start the app for the engines subcommand."""
342 self.log.info("IPython cluster: started")
342 self.log.info("IPython cluster: started")
343 # First see if the cluster is already running
343 # First see if the cluster is already running
344
344
345 # Now log and daemonize
345 # Now log and daemonize
346 self.log.info(
346 self.log.info(
347 'Starting engines with [daemon=%r]' % self.daemonize
347 'Starting engines with [daemon=%r]' % self.daemonize
348 )
348 )
349 # TODO: Get daemonize working on Windows or as a Windows Server.
349 # TODO: Get daemonize working on Windows or as a Windows Server.
350 if self.daemonize:
350 if self.daemonize:
351 if os.name=='posix':
351 if os.name=='posix':
352 daemonize()
352 daemonize()
353
353
354 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
354 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
355 dc.start()
355 dc.start()
356 # Now write the new pid file AFTER our new forked pid is active.
356 # Now write the new pid file AFTER our new forked pid is active.
357 # self.write_pid_file()
357 # self.write_pid_file()
358 try:
358 try:
359 self.loop.start()
359 self.loop.start()
360 except KeyboardInterrupt:
360 except KeyboardInterrupt:
361 pass
361 pass
362 except zmq.ZMQError as e:
362 except zmq.ZMQError as e:
363 if e.errno == errno.EINTR:
363 if e.errno == errno.EINTR:
364 pass
364 pass
365 else:
365 else:
366 raise
366 raise
367
367
368 start_aliases = {}
368 start_aliases = {}
369 start_aliases.update(engine_aliases)
369 start_aliases.update(engine_aliases)
370 start_aliases.update(dict(
370 start_aliases.update(dict(
371 delay='IPClusterStart.delay',
371 delay='IPClusterStart.delay',
372 controller = 'IPClusterStart.controller_launcher_class',
372 controller = 'IPClusterStart.controller_launcher_class',
373 ))
373 ))
374 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
374 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
375
375
376 # set inherited Start keys directly, to ensure command-line args get higher priority
376 # set inherited Start keys directly, to ensure command-line args get higher priority
377 # than config file options.
377 # than config file options.
378 for key,value in start_aliases.items():
378 for key,value in start_aliases.items():
379 if value.startswith('IPClusterEngines'):
379 if value.startswith('IPClusterEngines'):
380 start_aliases[key] = value.replace('IPClusterEngines', 'IPClusterStart')
380 start_aliases[key] = value.replace('IPClusterEngines', 'IPClusterStart')
381
381
382 class IPClusterStart(IPClusterEngines):
382 class IPClusterStart(IPClusterEngines):
383
383
384 name = u'ipcluster'
384 name = u'ipcluster'
385 description = start_help
385 description = start_help
386 examples = _start_examples
386 examples = _start_examples
387 default_log_level = logging.INFO
387 default_log_level = logging.INFO
388 auto_create = Bool(True, config=True,
388 auto_create = Bool(True, config=True,
389 help="whether to create the profile_dir if it doesn't exist")
389 help="whether to create the profile_dir if it doesn't exist")
390 classes = List()
390 classes = List()
391 def _classes_default(self,):
391 def _classes_default(self,):
392 from IPython.parallel.apps import launcher
392 from IPython.parallel.apps import launcher
393 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
393 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
394
394
395 clean_logs = Bool(True, config=True,
395 clean_logs = Bool(True, config=True,
396 help="whether to cleanup old logs before starting")
396 help="whether to cleanup old logs before starting")
397
397
398 delay = CFloat(1., config=True,
398 delay = CFloat(1., config=True,
399 help="delay (in s) between starting the controller and the engines")
399 help="delay (in s) between starting the controller and the engines")
400
400
401 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
401 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
402 config=True,
402 config=True,
403 helep="""The class for launching a Controller. Change this value if you want
403 helep="""The class for launching a Controller. Change this value if you want
404 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
404 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
405
405
406 Each launcher class has its own set of configuration options, for making sure
406 Each launcher class has its own set of configuration options, for making sure
407 it will work in your environment.
407 it will work in your environment.
408
408
409 Examples include:
409 Examples include:
410
410
411 LocalControllerLauncher : start engines locally as subprocesses
411 LocalControllerLauncher : start engines locally as subprocesses
412 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
412 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
413 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
413 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
414 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
414 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
415 SSHControllerLauncher : use SSH to start the controller
415 SSHControllerLauncher : use SSH to start the controller
416 WindowsHPCControllerLauncher : use Windows HPC
416 WindowsHPCControllerLauncher : use Windows HPC
417 """
417 """
418 )
418 )
419 reset = Bool(False, config=True,
419 reset = Bool(False, config=True,
420 help="Whether to reset config files as part of '--create'."
420 help="Whether to reset config files as part of '--create'."
421 )
421 )
422
422
423 # flags = Dict(flags)
423 # flags = Dict(flags)
424 aliases = Dict(start_aliases)
424 aliases = Dict(start_aliases)
425
425
426 def init_launchers(self):
426 def init_launchers(self):
427 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
427 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
428 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
428 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
429 self.controller_launcher.on_stop(self.stop_launchers)
429 self.controller_launcher.on_stop(self.stop_launchers)
430
430
431 def start_controller(self):
431 def start_controller(self):
432 self.controller_launcher.start(
432 self.controller_launcher.start(
433 self.profile_dir.location
433 self.profile_dir.location
434 )
434 )
435
435
436 def stop_controller(self):
436 def stop_controller(self):
437 # self.log.info("In stop_controller")
437 # self.log.info("In stop_controller")
438 if self.controller_launcher and self.controller_launcher.running:
438 if self.controller_launcher and self.controller_launcher.running:
439 return self.controller_launcher.stop()
439 return self.controller_launcher.stop()
440
440
441 def stop_launchers(self, r=None):
441 def stop_launchers(self, r=None):
442 if not self._stopping:
442 if not self._stopping:
443 self.stop_controller()
443 self.stop_controller()
444 super(IPClusterStart, self).stop_launchers()
444 super(IPClusterStart, self).stop_launchers()
445
445
446 def start(self):
446 def start(self):
447 """Start the app for the start subcommand."""
447 """Start the app for the start subcommand."""
448 # First see if the cluster is already running
448 # First see if the cluster is already running
449 try:
449 try:
450 pid = self.get_pid_from_file()
450 pid = self.get_pid_from_file()
451 except PIDFileError:
451 except PIDFileError:
452 pass
452 pass
453 else:
453 else:
454 if self.check_pid(pid):
454 if self.check_pid(pid):
455 self.log.critical(
455 self.log.critical(
456 'Cluster is already running with [pid=%s]. '
456 'Cluster is already running with [pid=%s]. '
457 'use "ipcluster stop" to stop the cluster.' % pid
457 'use "ipcluster stop" to stop the cluster.' % pid
458 )
458 )
459 # Here I exit with a unusual exit status that other processes
459 # Here I exit with a unusual exit status that other processes
460 # can watch for to learn how I existed.
460 # can watch for to learn how I existed.
461 self.exit(ALREADY_STARTED)
461 self.exit(ALREADY_STARTED)
462 else:
462 else:
463 self.remove_pid_file()
463 self.remove_pid_file()
464
464
465
465
466 # Now log and daemonize
466 # Now log and daemonize
467 self.log.info(
467 self.log.info(
468 'Starting ipcluster with [daemon=%r]' % self.daemonize
468 'Starting ipcluster with [daemon=%r]' % self.daemonize
469 )
469 )
470 # TODO: Get daemonize working on Windows or as a Windows Server.
470 # TODO: Get daemonize working on Windows or as a Windows Server.
471 if self.daemonize:
471 if self.daemonize:
472 if os.name=='posix':
472 if os.name=='posix':
473 daemonize()
473 daemonize()
474
474
475 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
475 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
476 dc.start()
476 dc.start()
477 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
477 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
478 dc.start()
478 dc.start()
479 # Now write the new pid file AFTER our new forked pid is active.
479 # Now write the new pid file AFTER our new forked pid is active.
480 self.write_pid_file()
480 self.write_pid_file()
481 try:
481 try:
482 self.loop.start()
482 self.loop.start()
483 except KeyboardInterrupt:
483 except KeyboardInterrupt:
484 pass
484 pass
485 except zmq.ZMQError as e:
485 except zmq.ZMQError as e:
486 if e.errno == errno.EINTR:
486 if e.errno == errno.EINTR:
487 pass
487 pass
488 else:
488 else:
489 raise
489 raise
490 finally:
490 finally:
491 self.remove_pid_file()
491 self.remove_pid_file()
492
492
493 base='IPython.parallel.apps.ipclusterapp.IPCluster'
493 base='IPython.parallel.apps.ipclusterapp.IPCluster'
494
494
495 class IPClusterApp(Application):
495 class IPClusterApp(Application):
496 name = u'ipcluster'
496 name = u'ipcluster'
497 description = _description
497 description = _description
498 examples = _main_examples
498 examples = _main_examples
499
499
500 subcommands = {
500 subcommands = {
501 'start' : (base+'Start', start_help),
501 'start' : (base+'Start', start_help),
502 'stop' : (base+'Stop', stop_help),
502 'stop' : (base+'Stop', stop_help),
503 'engines' : (base+'Engines', engines_help),
503 'engines' : (base+'Engines', engines_help),
504 }
504 }
505
505
506 # no aliases or flags for parent App
506 # no aliases or flags for parent App
507 aliases = Dict()
507 aliases = Dict()
508 flags = Dict()
508 flags = Dict()
509
509
510 def start(self):
510 def start(self):
511 if self.subapp is None:
511 if self.subapp is None:
512 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
512 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
513 print
513 print
514 self.print_description()
514 self.print_description()
515 self.print_subcommands()
515 self.print_subcommands()
516 self.exit(1)
516 self.exit(1)
517 else:
517 else:
518 return self.subapp.start()
518 return self.subapp.start()
519
519
520 def launch_new_instance():
520 def launch_new_instance():
521 """Create and run the IPython cluster."""
521 """Create and run the IPython cluster."""
522 app = IPClusterApp.instance()
522 app = IPClusterApp.instance()
523 app.initialize()
523 app.initialize()
524 app.start()
524 app.start()
525
525
526
526
527 if __name__ == '__main__':
527 if __name__ == '__main__':
528 launch_new_instance()
528 launch_new_instance()
529
529
General Comments 0
You need to be logged in to leave comments. Login now