##// END OF EJS Templates
expose IPClusterEngines.daemonize as `--daemonize` flag....
MinRK -
Show More
@@ -1,292 +1,293 b''
1 1 # encoding: utf-8
2 2 """
3 3 An application for IPython.
4 4
5 5 All top-level applications should use the classes in this module for
6 6 handling configuration and creating componenets.
7 7
8 8 The job of an :class:`Application` is to create the master configuration
9 9 object and then create the configurable objects, passing the config to them.
10 10
11 11 Authors:
12 12
13 13 * Brian Granger
14 14 * Fernando Perez
15 15 * Min RK
16 16
17 17 """
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Copyright (C) 2008-2011 The IPython Development Team
21 21 #
22 22 # Distributed under the terms of the BSD License. The full license is in
23 23 # the file COPYING, distributed as part of this software.
24 24 #-----------------------------------------------------------------------------
25 25
26 26 #-----------------------------------------------------------------------------
27 27 # Imports
28 28 #-----------------------------------------------------------------------------
29 29
30 30 import logging
31 31 import os
32 32 import shutil
33 33 import sys
34 34
35 35 from IPython.config.application import Application
36 36 from IPython.config.configurable import Configurable
37 37 from IPython.config.loader import Config
38 38 from IPython.core import release, crashhandler
39 39 from IPython.core.profiledir import ProfileDir, ProfileDirError
40 40 from IPython.utils.path import get_ipython_dir, get_ipython_package_dir
41 41 from IPython.utils.traitlets import List, Unicode, Type, Bool, Dict
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Classes and functions
45 45 #-----------------------------------------------------------------------------
46 46
47 47
48 48 #-----------------------------------------------------------------------------
49 49 # Base Application Class
50 50 #-----------------------------------------------------------------------------
51 51
52 52 # aliases and flags
53 53
54 54 base_aliases = dict(
55 55 profile='BaseIPythonApplication.profile',
56 56 ipython_dir='BaseIPythonApplication.ipython_dir',
57 57 log_level='Application.log_level',
58 58 )
59 59
60 60 base_flags = dict(
61 61 debug = ({'Application' : {'log_level' : logging.DEBUG}},
62 62 "set log level to logging.DEBUG (maximize logging output)"),
63 63 quiet = ({'Application' : {'log_level' : logging.CRITICAL}},
64 64 "set log level to logging.CRITICAL (minimize logging output)"),
65 65 init = ({'BaseIPythonApplication' : {
66 66 'copy_config_files' : True,
67 67 'auto_create' : True}
68 68 }, "Initialize profile with default config files")
69 69 )
70 70
71 71
72 72 class BaseIPythonApplication(Application):
73 73
74 74 name = Unicode(u'ipython')
75 75 description = Unicode(u'IPython: an enhanced interactive Python shell.')
76 76 version = Unicode(release.version)
77 77
78 78 aliases = Dict(base_aliases)
79 79 flags = Dict(base_flags)
80 classes = List([ProfileDir])
80 81
81 82 # Track whether the config_file has changed,
82 83 # because some logic happens only if we aren't using the default.
83 84 config_file_specified = Bool(False)
84 85
85 86 config_file_name = Unicode(u'ipython_config.py')
86 87 def _config_file_name_default(self):
87 88 return self.name.replace('-','_') + u'_config.py'
88 89 def _config_file_name_changed(self, name, old, new):
89 90 if new != old:
90 91 self.config_file_specified = True
91 92
92 93 # The directory that contains IPython's builtin profiles.
93 94 builtin_profile_dir = Unicode(
94 95 os.path.join(get_ipython_package_dir(), u'config', u'profile', u'default')
95 96 )
96 97
97 98 config_file_paths = List(Unicode)
98 99 def _config_file_paths_default(self):
99 100 return [os.getcwdu()]
100 101
101 102 profile = Unicode(u'default', config=True,
102 103 help="""The IPython profile to use."""
103 104 )
104 105 def _profile_changed(self, name, old, new):
105 106 self.builtin_profile_dir = os.path.join(
106 107 get_ipython_package_dir(), u'config', u'profile', new
107 108 )
108 109
109 110 ipython_dir = Unicode(get_ipython_dir(), config=True,
110 111 help="""
111 112 The name of the IPython directory. This directory is used for logging
112 113 configuration (through profiles), history storage, etc. The default
113 114 is usually $HOME/.ipython. This options can also be specified through
114 115 the environment variable IPYTHON_DIR.
115 116 """
116 117 )
117 118
118 119 overwrite = Bool(False, config=True,
119 120 help="""Whether to overwrite existing config files when copying""")
120 121 auto_create = Bool(False, config=True,
121 122 help="""Whether to create profile dir if it doesn't exist""")
122 123
123 124 config_files = List(Unicode)
124 125 def _config_files_default(self):
125 126 return [u'ipython_config.py']
126 127
127 128 copy_config_files = Bool(False, config=True,
128 129 help="""Whether to install the default config files into the profile dir.
129 130 If a new profile is being created, and IPython contains config files for that
130 131 profile, then they will be staged into the new directory. Otherwise,
131 132 default config files will be automatically generated.
132 133 """)
133 134
134 135 # The class to use as the crash handler.
135 136 crash_handler_class = Type(crashhandler.CrashHandler)
136 137
137 138 def __init__(self, **kwargs):
138 139 super(BaseIPythonApplication, self).__init__(**kwargs)
139 140 # ensure even default IPYTHON_DIR exists
140 141 if not os.path.exists(self.ipython_dir):
141 142 self._ipython_dir_changed('ipython_dir', self.ipython_dir, self.ipython_dir)
142 143
143 144 #-------------------------------------------------------------------------
144 145 # Various stages of Application creation
145 146 #-------------------------------------------------------------------------
146 147
147 148 def init_crash_handler(self):
148 149 """Create a crash handler, typically setting sys.excepthook to it."""
149 150 self.crash_handler = self.crash_handler_class(self)
150 151 sys.excepthook = self.crash_handler
151 152
152 153 def _ipython_dir_changed(self, name, old, new):
153 154 if old in sys.path:
154 155 sys.path.remove(old)
155 156 sys.path.append(os.path.abspath(new))
156 157 if not os.path.isdir(new):
157 158 os.makedirs(new, mode=0777)
158 159 readme = os.path.join(new, 'README')
159 160 if not os.path.exists(readme):
160 161 path = os.path.join(get_ipython_package_dir(), u'config', u'profile')
161 162 shutil.copy(os.path.join(path, 'README'), readme)
162 163 self.log.debug("IPYTHON_DIR set to: %s" % new)
163 164
164 165 def load_config_file(self, suppress_errors=True):
165 166 """Load the config file.
166 167
167 168 By default, errors in loading config are handled, and a warning
168 169 printed on screen. For testing, the suppress_errors option is set
169 170 to False, so errors will make tests fail.
170 171 """
171 172 base_config = 'ipython_config.py'
172 173 self.log.debug("Attempting to load config file: %s" %
173 174 base_config)
174 175 try:
175 176 Application.load_config_file(
176 177 self,
177 178 base_config,
178 179 path=self.config_file_paths
179 180 )
180 181 except IOError:
181 182 # ignore errors loading parent
182 183 pass
183 184 if self.config_file_name == base_config:
184 185 # don't load secondary config
185 186 return
186 187 self.log.debug("Attempting to load config file: %s" %
187 188 self.config_file_name)
188 189 try:
189 190 Application.load_config_file(
190 191 self,
191 192 self.config_file_name,
192 193 path=self.config_file_paths
193 194 )
194 195 except IOError:
195 196 # Only warn if the default config file was NOT being used.
196 197 if self.config_file_specified:
197 198 self.log.warn("Config file not found, skipping: %s" %
198 199 self.config_file_name)
199 200 except:
200 201 # For testing purposes.
201 202 if not suppress_errors:
202 203 raise
203 204 self.log.warn("Error loading config file: %s" %
204 205 self.config_file_name, exc_info=True)
205 206
206 207 def init_profile_dir(self):
207 208 """initialize the profile dir"""
208 209 try:
209 210 # location explicitly specified:
210 211 location = self.config.ProfileDir.location
211 212 except AttributeError:
212 213 # location not specified, find by profile name
213 214 try:
214 215 p = ProfileDir.find_profile_dir_by_name(self.ipython_dir, self.profile, self.config)
215 216 except ProfileDirError:
216 217 # not found, maybe create it (always create default profile)
217 218 if self.auto_create or self.profile=='default':
218 219 try:
219 220 p = ProfileDir.create_profile_dir_by_name(self.ipython_dir, self.profile, self.config)
220 221 except ProfileDirError:
221 222 self.log.fatal("Could not create profile: %r"%self.profile)
222 223 self.exit(1)
223 224 else:
224 225 self.log.info("Created profile dir: %r"%p.location)
225 226 else:
226 227 self.log.fatal("Profile %r not found."%self.profile)
227 228 self.exit(1)
228 229 else:
229 230 self.log.info("Using existing profile dir: %r"%p.location)
230 231 else:
231 232 # location is fully specified
232 233 try:
233 234 p = ProfileDir.find_profile_dir(location, self.config)
234 235 except ProfileDirError:
235 236 # not found, maybe create it
236 237 if self.auto_create:
237 238 try:
238 239 p = ProfileDir.create_profile_dir(location, self.config)
239 240 except ProfileDirError:
240 241 self.log.fatal("Could not create profile directory: %r"%location)
241 242 self.exit(1)
242 243 else:
243 244 self.log.info("Creating new profile dir: %r"%location)
244 245 else:
245 246 self.log.fatal("Profile directory %r not found."%location)
246 247 self.exit(1)
247 248 else:
248 249 self.log.info("Using existing profile dir: %r"%location)
249 250
250 251 self.profile_dir = p
251 252 self.config_file_paths.append(p.location)
252 253
253 254 def init_config_files(self):
254 255 """[optionally] copy default config files into profile dir."""
255 256 # copy config files
256 257 if self.copy_config_files:
257 258 path = self.builtin_profile_dir
258 259 src = self.profile
259 260
260 261 cfg = self.config_file_name
261 262 if path and os.path.exists(os.path.join(path, cfg)):
262 263 self.log.warn("Staging %r from %s into %r [overwrite=%s]"%(
263 264 cfg, src, self.profile_dir.location, self.overwrite)
264 265 )
265 266 self.profile_dir.copy_config_file(cfg, path=path, overwrite=self.overwrite)
266 267 else:
267 268 self.stage_default_config_file()
268 269
269 270 def stage_default_config_file(self):
270 271 """auto generate default config file, and stage it into the profile."""
271 272 s = self.generate_config_file()
272 273 fname = os.path.join(self.profile_dir.location, self.config_file_name)
273 274 if self.overwrite or not os.path.exists(fname):
274 275 self.log.warn("Generating default config file: %r"%(fname))
275 276 with open(fname, 'w') as f:
276 277 f.write(s)
277 278
278 279
279 280 def initialize(self, argv=None):
280 281 # don't hook up crash handler before parsing command-line
281 282 self.parse_command_line(argv)
282 283 self.init_crash_handler()
283 284 if self.subapp is not None:
284 285 # stop here if subapp is taking over
285 286 return
286 287 cl_config = self.config
287 288 self.init_profile_dir()
288 289 self.init_config_files()
289 290 self.load_config_file()
290 291 # enforce cl-opts override configfile opts:
291 292 self.update_config(cl_config)
292 293
@@ -1,447 +1,458 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import errno
25 25 import logging
26 26 import os
27 27 import re
28 28 import signal
29 29
30 30 from subprocess import check_call, CalledProcessError, PIPE
31 31 import zmq
32 32 from zmq.eventloop import ioloop
33 33
34 34 from IPython.config.application import Application, boolean_flag
35 35 from IPython.config.loader import Config
36 36 from IPython.core.application import BaseIPythonApplication
37 37 from IPython.core.profiledir import ProfileDir
38 38 from IPython.utils.daemonize import daemonize
39 39 from IPython.utils.importstring import import_item
40 40 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
41 41 DottedObjectName)
42 42
43 43 from IPython.parallel.apps.baseapp import (
44 44 BaseParallelApplication,
45 45 PIDFileError,
46 46 base_flags, base_aliases
47 47 )
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Module level variables
52 52 #-----------------------------------------------------------------------------
53 53
54 54
55 55 default_config_file_name = u'ipcluster_config.py'
56 56
57 57
58 58 _description = """Start an IPython cluster for parallel computing.
59 59
60 60 An IPython cluster consists of 1 controller and 1 or more engines.
61 61 This command automates the startup of these processes using a wide
62 62 range of startup methods (SSH, local processes, PBS, mpiexec,
63 63 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 64 local host simply do 'ipcluster start n=4'. For more complex usage
65 65 you will typically do 'ipcluster create profile=mycluster', then edit
66 66 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
67 67 """
68 68
69 69
70 70 # Exit codes for ipcluster
71 71
72 72 # This will be the exit code if the ipcluster appears to be running because
73 73 # a .pid file exists
74 74 ALREADY_STARTED = 10
75 75
76 76
77 77 # This will be the exit code if ipcluster stop is run, but there is not .pid
78 78 # file to be found.
79 79 ALREADY_STOPPED = 11
80 80
81 81 # This will be the exit code if ipcluster engines is run, but there is not .pid
82 82 # file to be found.
83 83 NO_CLUSTER = 12
84 84
85 85
86 86 #-----------------------------------------------------------------------------
87 87 # Main application
88 88 #-----------------------------------------------------------------------------
89 89 start_help = """Start an IPython cluster for parallel computing
90 90
91 91 Start an ipython cluster by its profile name or cluster
92 92 directory. Cluster directories contain configuration, log and
93 93 security related files and are named using the convention
94 94 'profile_<name>' and should be creating using the 'start'
95 95 subcommand of 'ipcluster'. If your cluster directory is in
96 96 the cwd or the ipython directory, you can simply refer to it
97 97 using its profile name, 'ipcluster start n=4 profile=<profile>`,
98 98 otherwise use the 'profile_dir' option.
99 99 """
100 100 stop_help = """Stop a running IPython cluster
101 101
102 102 Stop a running ipython cluster by its profile name or cluster
103 103 directory. Cluster directories are named using the convention
104 104 'profile_<name>'. If your cluster directory is in
105 105 the cwd or the ipython directory, you can simply refer to it
106 106 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
107 107 use the 'profile_dir' option.
108 108 """
109 109 engines_help = """Start engines connected to an existing IPython cluster
110 110
111 111 Start one or more engines to connect to an existing Cluster
112 112 by profile name or cluster directory.
113 113 Cluster directories contain configuration, log and
114 114 security related files and are named using the convention
115 115 'profile_<name>' and should be creating using the 'start'
116 116 subcommand of 'ipcluster'. If your cluster directory is in
117 117 the cwd or the ipython directory, you can simply refer to it
118 118 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
119 119 otherwise use the 'profile_dir' option.
120 120 """
121 121 stop_aliases = dict(
122 122 signal='IPClusterStop.signal',
123 profile='BaseIPythonApplication.profile',
124 profile_dir='ProfileDir.location',
125 123 )
124 stop_aliases.update(base_aliases)
126 125
127 126 class IPClusterStop(BaseParallelApplication):
128 127 name = u'ipcluster'
129 128 description = stop_help
130 129 config_file_name = Unicode(default_config_file_name)
131 130
132 131 signal = Int(signal.SIGINT, config=True,
133 132 help="signal to use for stopping processes.")
134 133
135 134 aliases = Dict(stop_aliases)
136 135
137 136 def start(self):
138 137 """Start the app for the stop subcommand."""
139 138 try:
140 139 pid = self.get_pid_from_file()
141 140 except PIDFileError:
142 141 self.log.critical(
143 142 'Could not read pid file, cluster is probably not running.'
144 143 )
145 144 # Here I exit with a unusual exit status that other processes
146 145 # can watch for to learn how I existed.
147 146 self.remove_pid_file()
148 147 self.exit(ALREADY_STOPPED)
149 148
150 149 if not self.check_pid(pid):
151 150 self.log.critical(
152 151 'Cluster [pid=%r] is not running.' % pid
153 152 )
154 153 self.remove_pid_file()
155 154 # Here I exit with a unusual exit status that other processes
156 155 # can watch for to learn how I existed.
157 156 self.exit(ALREADY_STOPPED)
158 157
159 158 elif os.name=='posix':
160 159 sig = self.signal
161 160 self.log.info(
162 161 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
163 162 )
164 163 try:
165 164 os.kill(pid, sig)
166 165 except OSError:
167 166 self.log.error("Stopping cluster failed, assuming already dead.",
168 167 exc_info=True)
169 168 self.remove_pid_file()
170 169 elif os.name=='nt':
171 170 try:
172 171 # kill the whole tree
173 172 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
174 173 except (CalledProcessError, OSError):
175 174 self.log.error("Stopping cluster failed, assuming already dead.",
176 175 exc_info=True)
177 176 self.remove_pid_file()
178 177
179 178 engine_aliases = {}
180 179 engine_aliases.update(base_aliases)
181 180 engine_aliases.update(dict(
182 181 n='IPClusterEngines.n',
183 182 elauncher = 'IPClusterEngines.engine_launcher_class',
183 daemonize = 'IPClusterEngines.daemonize',
184 ))
185 engine_flags = {}
186 engine_flags.update(base_flags)
187
188 engine_flags.update(dict(
189 daemonize=(
190 {'IPClusterEngines' : {'daemonize' : True}},
191 """run the cluster into the background (not available on Windows)""",
192 )
184 193 ))
185 194 class IPClusterEngines(BaseParallelApplication):
186 195
187 196 name = u'ipcluster'
188 197 description = engines_help
189 198 usage = None
190 199 config_file_name = Unicode(default_config_file_name)
191 200 default_log_level = logging.INFO
192 201 classes = List()
193 202 def _classes_default(self):
194 203 from IPython.parallel.apps import launcher
195 204 launchers = launcher.all_launchers
196 205 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
197 206 return [ProfileDir]+eslaunchers
198 207
199 208 n = Int(2, config=True,
200 209 help="The number of engines to start.")
201 210
202 211 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
203 212 config=True,
204 213 help="The class for launching a set of Engines."
205 214 )
206 215 daemonize = Bool(False, config=True,
207 help='Daemonize the ipcluster program. This implies --log-to-file')
216 help="""Daemonize the ipcluster program. This implies --log-to-file.
217 Not available on Windows.
218 """)
208 219
209 220 def _daemonize_changed(self, name, old, new):
210 221 if new:
211 222 self.log_to_file = True
212 223
213 224 aliases = Dict(engine_aliases)
214 # flags = Dict(flags)
225 flags = Dict(engine_flags)
215 226 _stopping = False
216 227
217 228 def initialize(self, argv=None):
218 229 super(IPClusterEngines, self).initialize(argv)
219 230 self.init_signal()
220 231 self.init_launchers()
221 232
222 233 def init_launchers(self):
223 234 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
224 235 self.engine_launcher.on_stop(lambda r: self.loop.stop())
225 236
226 237 def init_signal(self):
227 238 # Setup signals
228 239 signal.signal(signal.SIGINT, self.sigint_handler)
229 240
230 241 def build_launcher(self, clsname):
231 242 """import and instantiate a Launcher based on importstring"""
232 243 if '.' not in clsname:
233 244 # not a module, presume it's the raw name in apps.launcher
234 245 clsname = 'IPython.parallel.apps.launcher.'+clsname
235 246 # print repr(clsname)
236 247 klass = import_item(clsname)
237 248
238 249 launcher = klass(
239 250 work_dir=self.profile_dir.location, config=self.config, log=self.log
240 251 )
241 252 return launcher
242 253
243 254 def start_engines(self):
244 255 self.log.info("Starting %i engines"%self.n)
245 256 self.engine_launcher.start(
246 257 self.n,
247 258 self.profile_dir.location
248 259 )
249 260
250 261 def stop_engines(self):
251 262 self.log.info("Stopping Engines...")
252 263 if self.engine_launcher.running:
253 264 d = self.engine_launcher.stop()
254 265 return d
255 266 else:
256 267 return None
257 268
258 269 def stop_launchers(self, r=None):
259 270 if not self._stopping:
260 271 self._stopping = True
261 272 self.log.error("IPython cluster: stopping")
262 273 self.stop_engines()
263 274 # Wait a few seconds to let things shut down.
264 275 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
265 276 dc.start()
266 277
267 278 def sigint_handler(self, signum, frame):
268 279 self.log.debug("SIGINT received, stopping launchers...")
269 280 self.stop_launchers()
270 281
271 282 def start_logging(self):
272 283 # Remove old log files of the controller and engine
273 284 if self.clean_logs:
274 285 log_dir = self.profile_dir.log_dir
275 286 for f in os.listdir(log_dir):
276 287 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
277 288 os.remove(os.path.join(log_dir, f))
278 289 # This will remove old log files for ipcluster itself
279 290 # super(IPBaseParallelApplication, self).start_logging()
280 291
281 292 def start(self):
282 293 """Start the app for the engines subcommand."""
283 294 self.log.info("IPython cluster: started")
284 295 # First see if the cluster is already running
285 296
286 297 # Now log and daemonize
287 298 self.log.info(
288 299 'Starting engines with [daemon=%r]' % self.daemonize
289 300 )
290 301 # TODO: Get daemonize working on Windows or as a Windows Server.
291 302 if self.daemonize:
292 303 if os.name=='posix':
293 304 daemonize()
294 305
295 306 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
296 307 dc.start()
297 308 # Now write the new pid file AFTER our new forked pid is active.
298 309 # self.write_pid_file()
299 310 try:
300 311 self.loop.start()
301 312 except KeyboardInterrupt:
302 313 pass
303 314 except zmq.ZMQError as e:
304 315 if e.errno == errno.EINTR:
305 316 pass
306 317 else:
307 318 raise
308 319
309 320 start_aliases = {}
310 321 start_aliases.update(engine_aliases)
311 322 start_aliases.update(dict(
312 323 delay='IPClusterStart.delay',
313 324 clean_logs='IPClusterStart.clean_logs',
314 325 ))
315 326
316 327 class IPClusterStart(IPClusterEngines):
317 328
318 329 name = u'ipcluster'
319 330 description = start_help
320 331 default_log_level = logging.INFO
321 332 auto_create = Bool(True, config=True,
322 333 help="whether to create the profile_dir if it doesn't exist")
323 334 classes = List()
324 335 def _classes_default(self,):
325 336 from IPython.parallel.apps import launcher
326 337 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
327 338
328 339 clean_logs = Bool(True, config=True,
329 340 help="whether to cleanup old logs before starting")
330 341
331 342 delay = CFloat(1., config=True,
332 343 help="delay (in s) between starting the controller and the engines")
333 344
334 345 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
335 346 config=True,
336 347 help="The class for launching a Controller."
337 348 )
338 349 reset = Bool(False, config=True,
339 350 help="Whether to reset config files as part of '--create'."
340 351 )
341 352
342 353 # flags = Dict(flags)
343 354 aliases = Dict(start_aliases)
344 355
345 356 def init_launchers(self):
346 357 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
347 358 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
348 359 self.controller_launcher.on_stop(self.stop_launchers)
349 360
350 361 def start_controller(self):
351 362 self.controller_launcher.start(
352 363 self.profile_dir.location
353 364 )
354 365
355 366 def stop_controller(self):
356 367 # self.log.info("In stop_controller")
357 368 if self.controller_launcher and self.controller_launcher.running:
358 369 return self.controller_launcher.stop()
359 370
360 371 def stop_launchers(self, r=None):
361 372 if not self._stopping:
362 373 self.stop_controller()
363 374 super(IPClusterStart, self).stop_launchers()
364 375
365 376 def start(self):
366 377 """Start the app for the start subcommand."""
367 378 # First see if the cluster is already running
368 379 try:
369 380 pid = self.get_pid_from_file()
370 381 except PIDFileError:
371 382 pass
372 383 else:
373 384 if self.check_pid(pid):
374 385 self.log.critical(
375 386 'Cluster is already running with [pid=%s]. '
376 387 'use "ipcluster stop" to stop the cluster.' % pid
377 388 )
378 389 # Here I exit with a unusual exit status that other processes
379 390 # can watch for to learn how I existed.
380 391 self.exit(ALREADY_STARTED)
381 392 else:
382 393 self.remove_pid_file()
383 394
384 395
385 396 # Now log and daemonize
386 397 self.log.info(
387 398 'Starting ipcluster with [daemon=%r]' % self.daemonize
388 399 )
389 400 # TODO: Get daemonize working on Windows or as a Windows Server.
390 401 if self.daemonize:
391 402 if os.name=='posix':
392 403 daemonize()
393 404
394 405 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
395 406 dc.start()
396 407 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
397 408 dc.start()
398 409 # Now write the new pid file AFTER our new forked pid is active.
399 410 self.write_pid_file()
400 411 try:
401 412 self.loop.start()
402 413 except KeyboardInterrupt:
403 414 pass
404 415 except zmq.ZMQError as e:
405 416 if e.errno == errno.EINTR:
406 417 pass
407 418 else:
408 419 raise
409 420 finally:
410 421 self.remove_pid_file()
411 422
412 423 base='IPython.parallel.apps.ipclusterapp.IPCluster'
413 424
414 425 class IPClusterApp(Application):
415 426 name = u'ipcluster'
416 427 description = _description
417 428
418 429 subcommands = {
419 430 'start' : (base+'Start', start_help),
420 431 'stop' : (base+'Stop', stop_help),
421 432 'engines' : (base+'Engines', engines_help),
422 433 }
423 434
424 435 # no aliases or flags for parent App
425 436 aliases = Dict()
426 437 flags = Dict()
427 438
428 439 def start(self):
429 440 if self.subapp is None:
430 441 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
431 442 print
432 443 self.print_description()
433 444 self.print_subcommands()
434 445 self.exit(1)
435 446 else:
436 447 return self.subapp.start()
437 448
438 449 def launch_new_instance():
439 450 """Create and run the IPython cluster."""
440 451 app = IPClusterApp.instance()
441 452 app.initialize()
442 453 app.start()
443 454
444 455
445 456 if __name__ == '__main__':
446 457 launch_new_instance()
447 458
General Comments 0
You need to be logged in to leave comments. Login now