##// END OF EJS Templates
General work on the controller/engine/cluster startup....
Brian Granger -
Show More
@@ -343,13 +343,13 b' class Application(object):'
343 343 self._exiting = True
344 344 sys.exit(1)
345 345
346 def exit(self):
346 def exit(self, exit_status=0):
347 347 if self._exiting:
348 348 pass
349 349 else:
350 350 self.log.debug("Exiting application: %s" % self.name)
351 351 self._exiting = True
352 sys.exit(1)
352 sys.exit(exit_status)
353 353
354 354 def attempt(self, func, action='abort'):
355 355 try:
@@ -360,5 +360,5 b' class Application(object):'
360 360 if action == 'abort':
361 361 self.abort()
362 362 elif action == 'exit':
363 self.exit()
363 self.exit(0)
364 364
@@ -459,3 +459,5 b' class ApplicationWithClusterDir(Application):'
459 459 return pid
460 460 else:
461 461 raise PIDFileError('pid file not found: %s' % pid_file)
462
463
@@ -206,6 +206,13 b' class FCServiceFactory(AdaptedConfiguredObjectFactory):'
206 206 for ff in furl_files:
207 207 fullfile = self._get_security_file(ff)
208 208 if self.reuse_furls:
209 if self.port==0:
210 raise FURLError("You are trying to reuse the FURL file "
211 "for this connection, but the port for this connection "
212 "is set to 0 (autoselect). To reuse the FURL file "
213 "you need to specify specific port to listen on."
214 )
215 else:
209 216 log.msg("Reusing FURL file: %s" % fullfile)
210 217 else:
211 218 if os.path.isfile(fullfile):
@@ -32,18 +32,24 b' from IPython.kernel.clusterdir import ('
32 32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 33 )
34 34
35 from twisted.internet import reactor, defer
35 from twisted.internet import reactor
36 36 from twisted.python import log
37 37
38
38 39 #-----------------------------------------------------------------------------
39 # Code for launchers
40 # The ipcluster application
40 41 #-----------------------------------------------------------------------------
41 42
42 43
44 # Exit codes for ipcluster
43 45
44 #-----------------------------------------------------------------------------
45 # The ipcluster application
46 #-----------------------------------------------------------------------------
46 # This will be the exit code if the ipcluster appears to be running because
47 # a .pid file exists
48 ALREADY_STARTED = 10
49
50 # This will be the exit code if ipcluster stop is run, but there is not .pid
51 # file to be found.
52 ALREADY_STOPPED = 11
47 53
48 54
49 55 class IPClusterCLLoader(ArgParseConfigLoader):
@@ -151,13 +157,14 b' class IPClusterCLLoader(ArgParseConfigLoader):'
151 157 help='Stop a cluster.',
152 158 parents=[parent_parser1, parent_parser2]
153 159 )
154 parser_start.add_argument('--signal-number',
155 dest='Global.stop_signal', type=int,
160 parser_start.add_argument('--signal',
161 dest='Global.signal', type=int,
156 162 help="The signal number to use in stopping the cluster (default=2).",
157 metavar="Global.stop_signal",
163 metavar="Global.signal",
158 164 default=NoConfigDefault
159 165 )
160 166
167
161 168 default_config_file_name = 'ipcluster_config.py'
162 169
163 170
@@ -178,7 +185,7 b' class IPClusterApp(ApplicationWithClusterDir):'
178 185 self.default_config.Global.n = 2
179 186 self.default_config.Global.reset_config = False
180 187 self.default_config.Global.clean_logs = True
181 self.default_config.Global.stop_signal = 2
188 self.default_config.Global.signal = 2
182 189 self.default_config.Global.daemonize = False
183 190
184 191 def create_command_line_config(self):
@@ -209,28 +216,6 b' class IPClusterApp(ApplicationWithClusterDir):'
209 216 "information about creating and listing cluster dirs."
210 217 )
211 218
212 def pre_construct(self):
213 super(IPClusterApp, self).pre_construct()
214 config = self.master_config
215 try:
216 daemon = config.Global.daemonize
217 if daemon:
218 config.Global.log_to_file = True
219 except AttributeError:
220 pass
221
222 def construct(self):
223 config = self.master_config
224 if config.Global.subcommand=='list':
225 pass
226 elif config.Global.subcommand=='create':
227 self.log.info('Copying default config files to cluster directory '
228 '[overwrite=%r]' % (config.Global.reset_config,))
229 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
230 elif config.Global.subcommand=='start':
231 self.start_logging()
232 reactor.callWhenRunning(self.start_launchers)
233
234 219 def list_cluster_dirs(self):
235 220 # Find the search paths
236 221 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
@@ -256,6 +241,28 b' class IPClusterApp(ApplicationWithClusterDir):'
256 241 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
257 242 print start_cmd + " ==> " + full_path
258 243
244 def pre_construct(self):
245 super(IPClusterApp, self).pre_construct()
246 config = self.master_config
247 try:
248 daemon = config.Global.daemonize
249 if daemon:
250 config.Global.log_to_file = True
251 except AttributeError:
252 pass
253
254 def construct(self):
255 config = self.master_config
256 if config.Global.subcommand=='list':
257 pass
258 elif config.Global.subcommand=='create':
259 self.log.info('Copying default config files to cluster directory '
260 '[overwrite=%r]' % (config.Global.reset_config,))
261 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
262 elif config.Global.subcommand=='start':
263 self.start_logging()
264 reactor.callWhenRunning(self.start_launchers)
265
259 266 def start_launchers(self):
260 267 config = self.master_config
261 268
@@ -326,15 +333,21 b' class IPClusterApp(ApplicationWithClusterDir):'
326 333
327 334 def start_app(self):
328 335 """Start the application, depending on what subcommand is used."""
329 config = self.master_config
330 subcmd = config.Global.subcommand
336 subcmd = self.master_config.Global.subcommand
331 337 if subcmd=='create' or subcmd=='list':
332 338 return
333 339 elif subcmd=='start':
340 self.start_app_start()
341 elif subcmd=='stop':
342 self.start_app_stop()
343
344 def start_app_start(self):
345 """Start the app for the start subcommand."""
346 config = self.master_config
334 347 # First see if the cluster is already running
335 348 try:
336 349 pid = self.get_pid_from_file()
337 except:
350 except PIDFileError:
338 351 pass
339 352 else:
340 353 self.log.critical(
@@ -343,20 +356,26 b' class IPClusterApp(ApplicationWithClusterDir):'
343 356 )
344 357 # Here I exit with a unusual exit status that other processes
345 358 # can watch for to learn how I existed.
346 sys.exit(10)
359 self.exit(ALREADY_STARTED)
360
347 361 # Now log and daemonize
348 self.log.info('Starting ipcluster with [daemon=%r]' % config.Global.daemonize)
362 self.log.info(
363 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
364 )
349 365 if config.Global.daemonize:
350 366 if os.name=='posix':
351 os.chdir(config.Global.cluster_dir)
352 self.log_level = 40
353 367 daemonize()
354 368
355 # Now write the new pid file after our new forked pid is active.
369 # Now write the new pid file AFTER our new forked pid is active.
356 370 self.write_pid_file()
371 # cd to the cluster_dir as our working directory.
372 os.chdir(config.Global.cluster_dir)
357 373 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
358 374 reactor.run()
359 elif subcmd=='stop':
375
376 def start_app_stop(self):
377 """Start the app for the stop subcommand."""
378 config = self.master_config
360 379 try:
361 380 pid = self.get_pid_from_file()
362 381 except PIDFileError:
@@ -365,8 +384,8 b' class IPClusterApp(ApplicationWithClusterDir):'
365 384 )
366 385 # Here I exit with a unusual exit status that other processes
367 386 # can watch for to learn how I existed.
368 sys.exit(11)
369 sig = config.Global.stop_signal
387 self.exit(ALREADY_STOPPED)
388 sig = config.Global.signal
370 389 self.log.info(
371 390 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
372 391 )
@@ -244,9 +244,15 b' class IPControllerApp(ApplicationWithClusterDir):'
244 244 log.msg("Error running statement: %s" % s)
245 245
246 246 def start_app(self):
247 # Start the controller service and set things running
247 # Start the controller service.
248 248 self.main_service.startService()
249 # Write the .pid file overwriting old ones. This allow multiple
250 # controllers to clober each other. But Windows is not cleaning
251 # these up properly.
249 252 self.write_pid_file(overwrite=True)
253 # cd to the cluster_dir as our working directory.
254 os.chdir(self.master_config.Global.cluster_dir)
255 # Add a trigger to delete the .pid file upon shutting down.
250 256 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
251 257 reactor.run()
252 258
@@ -224,7 +224,8 b' class IPEngineApp(ApplicationWithClusterDir):'
224 224 log.msg("Error executing statement: %s" % line)
225 225
226 226 def start_app(self):
227 # Start the controller service and set things running
227 # cd to the cluster_dir as our working directory.
228 os.chdir(self.master_config.Global.cluster_dir)
228 229 reactor.run()
229 230
230 231
General Comments 0
You need to be logged in to leave comments. Login now