##// END OF EJS Templates
General work on the controller/engine/cluster startup....
Brian Granger -
Show More
@@ -343,13 +343,13 b' class Application(object):'
343 self._exiting = True
343 self._exiting = True
344 sys.exit(1)
344 sys.exit(1)
345
345
346 def exit(self):
346 def exit(self, exit_status=0):
347 if self._exiting:
347 if self._exiting:
348 pass
348 pass
349 else:
349 else:
350 self.log.debug("Exiting application: %s" % self.name)
350 self.log.debug("Exiting application: %s" % self.name)
351 self._exiting = True
351 self._exiting = True
352 sys.exit(1)
352 sys.exit(exit_status)
353
353
354 def attempt(self, func, action='abort'):
354 def attempt(self, func, action='abort'):
355 try:
355 try:
@@ -360,5 +360,5 b' class Application(object):'
360 if action == 'abort':
360 if action == 'abort':
361 self.abort()
361 self.abort()
362 elif action == 'exit':
362 elif action == 'exit':
363 self.exit()
363 self.exit(0)
364
364
@@ -458,4 +458,6 b' class ApplicationWithClusterDir(Application):'
458 pid = int(f.read().strip())
458 pid = int(f.read().strip())
459 return pid
459 return pid
460 else:
460 else:
461 raise PIDFileError('pid file not found: %s' % pid_file) No newline at end of file
461 raise PIDFileError('pid file not found: %s' % pid_file)
462
463
@@ -206,7 +206,14 b' class FCServiceFactory(AdaptedConfiguredObjectFactory):'
206 for ff in furl_files:
206 for ff in furl_files:
207 fullfile = self._get_security_file(ff)
207 fullfile = self._get_security_file(ff)
208 if self.reuse_furls:
208 if self.reuse_furls:
209 log.msg("Reusing FURL file: %s" % fullfile)
209 if self.port==0:
210 raise FURLError("You are trying to reuse the FURL file "
211 "for this connection, but the port for this connection "
212 "is set to 0 (autoselect). To reuse the FURL file "
213 "you need to specify specific port to listen on."
214 )
215 else:
216 log.msg("Reusing FURL file: %s" % fullfile)
210 else:
217 else:
211 if os.path.isfile(fullfile):
218 if os.path.isfile(fullfile):
212 log.msg("Removing old FURL file: %s" % fullfile)
219 log.msg("Removing old FURL file: %s" % fullfile)
@@ -32,18 +32,24 b' from IPython.kernel.clusterdir import ('
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 )
33 )
34
34
35 from twisted.internet import reactor, defer
35 from twisted.internet import reactor
36 from twisted.python import log
36 from twisted.python import log
37
37
38
38 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
39 # Code for launchers
40 # The ipcluster application
40 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
41
42
42
43
44 # Exit codes for ipcluster
43
45
44 #-----------------------------------------------------------------------------
46 # This will be the exit code if the ipcluster appears to be running because
45 # The ipcluster application
47 # a .pid file exists
46 #-----------------------------------------------------------------------------
48 ALREADY_STARTED = 10
49
50 # This will be the exit code if ipcluster stop is run, but there is not .pid
51 # file to be found.
52 ALREADY_STOPPED = 11
47
53
48
54
49 class IPClusterCLLoader(ArgParseConfigLoader):
55 class IPClusterCLLoader(ArgParseConfigLoader):
@@ -151,13 +157,14 b' class IPClusterCLLoader(ArgParseConfigLoader):'
151 help='Stop a cluster.',
157 help='Stop a cluster.',
152 parents=[parent_parser1, parent_parser2]
158 parents=[parent_parser1, parent_parser2]
153 )
159 )
154 parser_start.add_argument('--signal-number',
160 parser_start.add_argument('--signal',
155 dest='Global.stop_signal', type=int,
161 dest='Global.signal', type=int,
156 help="The signal number to use in stopping the cluster (default=2).",
162 help="The signal number to use in stopping the cluster (default=2).",
157 metavar="Global.stop_signal",
163 metavar="Global.signal",
158 default=NoConfigDefault
164 default=NoConfigDefault
159 )
165 )
160
166
167
161 default_config_file_name = 'ipcluster_config.py'
168 default_config_file_name = 'ipcluster_config.py'
162
169
163
170
@@ -178,7 +185,7 b' class IPClusterApp(ApplicationWithClusterDir):'
178 self.default_config.Global.n = 2
185 self.default_config.Global.n = 2
179 self.default_config.Global.reset_config = False
186 self.default_config.Global.reset_config = False
180 self.default_config.Global.clean_logs = True
187 self.default_config.Global.clean_logs = True
181 self.default_config.Global.stop_signal = 2
188 self.default_config.Global.signal = 2
182 self.default_config.Global.daemonize = False
189 self.default_config.Global.daemonize = False
183
190
184 def create_command_line_config(self):
191 def create_command_line_config(self):
@@ -209,28 +216,6 b' class IPClusterApp(ApplicationWithClusterDir):'
209 "information about creating and listing cluster dirs."
216 "information about creating and listing cluster dirs."
210 )
217 )
211
218
212 def pre_construct(self):
213 super(IPClusterApp, self).pre_construct()
214 config = self.master_config
215 try:
216 daemon = config.Global.daemonize
217 if daemon:
218 config.Global.log_to_file = True
219 except AttributeError:
220 pass
221
222 def construct(self):
223 config = self.master_config
224 if config.Global.subcommand=='list':
225 pass
226 elif config.Global.subcommand=='create':
227 self.log.info('Copying default config files to cluster directory '
228 '[overwrite=%r]' % (config.Global.reset_config,))
229 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
230 elif config.Global.subcommand=='start':
231 self.start_logging()
232 reactor.callWhenRunning(self.start_launchers)
233
234 def list_cluster_dirs(self):
219 def list_cluster_dirs(self):
235 # Find the search paths
220 # Find the search paths
236 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
221 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
@@ -256,6 +241,28 b' class IPClusterApp(ApplicationWithClusterDir):'
256 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
241 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
257 print start_cmd + " ==> " + full_path
242 print start_cmd + " ==> " + full_path
258
243
244 def pre_construct(self):
245 super(IPClusterApp, self).pre_construct()
246 config = self.master_config
247 try:
248 daemon = config.Global.daemonize
249 if daemon:
250 config.Global.log_to_file = True
251 except AttributeError:
252 pass
253
254 def construct(self):
255 config = self.master_config
256 if config.Global.subcommand=='list':
257 pass
258 elif config.Global.subcommand=='create':
259 self.log.info('Copying default config files to cluster directory '
260 '[overwrite=%r]' % (config.Global.reset_config,))
261 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
262 elif config.Global.subcommand=='start':
263 self.start_logging()
264 reactor.callWhenRunning(self.start_launchers)
265
259 def start_launchers(self):
266 def start_launchers(self):
260 config = self.master_config
267 config = self.master_config
261
268
@@ -326,51 +333,63 b' class IPClusterApp(ApplicationWithClusterDir):'
326
333
327 def start_app(self):
334 def start_app(self):
328 """Start the application, depending on what subcommand is used."""
335 """Start the application, depending on what subcommand is used."""
329 config = self.master_config
336 subcmd = self.master_config.Global.subcommand
330 subcmd = config.Global.subcommand
331 if subcmd=='create' or subcmd=='list':
337 if subcmd=='create' or subcmd=='list':
332 return
338 return
333 elif subcmd=='start':
339 elif subcmd=='start':
334 # First see if the cluster is already running
340 self.start_app_start()
335 try:
336 pid = self.get_pid_from_file()
337 except:
338 pass
339 else:
340 self.log.critical(
341 'Cluster is already running with [pid=%s]. '
342 'use "ipcluster stop" to stop the cluster.' % pid
343 )
344 # Here I exit with a unusual exit status that other processes
345 # can watch for to learn how I existed.
346 sys.exit(10)
347 # Now log and daemonize
348 self.log.info('Starting ipcluster with [daemon=%r]' % config.Global.daemonize)
349 if config.Global.daemonize:
350 if os.name=='posix':
351 os.chdir(config.Global.cluster_dir)
352 self.log_level = 40
353 daemonize()
354
355 # Now write the new pid file after our new forked pid is active.
356 self.write_pid_file()
357 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
358 reactor.run()
359 elif subcmd=='stop':
341 elif subcmd=='stop':
360 try:
342 self.start_app_stop()
361 pid = self.get_pid_from_file()
343
362 except PIDFileError:
344 def start_app_start(self):
363 self.log.critical(
345 """Start the app for the start subcommand."""
364 'Problem reading pid file, cluster is probably not running.'
346 config = self.master_config
365 )
347 # First see if the cluster is already running
366 # Here I exit with a unusual exit status that other processes
348 try:
367 # can watch for to learn how I existed.
349 pid = self.get_pid_from_file()
368 sys.exit(11)
350 except PIDFileError:
369 sig = config.Global.stop_signal
351 pass
370 self.log.info(
352 else:
371 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
353 self.log.critical(
354 'Cluster is already running with [pid=%s]. '
355 'use "ipcluster stop" to stop the cluster.' % pid
356 )
357 # Here I exit with a unusual exit status that other processes
358 # can watch for to learn how I existed.
359 self.exit(ALREADY_STARTED)
360
361 # Now log and daemonize
362 self.log.info(
363 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
364 )
365 if config.Global.daemonize:
366 if os.name=='posix':
367 daemonize()
368
369 # Now write the new pid file AFTER our new forked pid is active.
370 self.write_pid_file()
371 # cd to the cluster_dir as our working directory.
372 os.chdir(config.Global.cluster_dir)
373 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
374 reactor.run()
375
376 def start_app_stop(self):
377 """Start the app for the stop subcommand."""
378 config = self.master_config
379 try:
380 pid = self.get_pid_from_file()
381 except PIDFileError:
382 self.log.critical(
383 'Problem reading pid file, cluster is probably not running.'
372 )
384 )
373 os.kill(pid, sig)
385 # Here I exit with a unusual exit status that other processes
386 # can watch for to learn how I existed.
387 self.exit(ALREADY_STOPPED)
388 sig = config.Global.signal
389 self.log.info(
390 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
391 )
392 os.kill(pid, sig)
374
393
375
394
376 def launch_new_instance():
395 def launch_new_instance():
@@ -244,9 +244,15 b' class IPControllerApp(ApplicationWithClusterDir):'
244 log.msg("Error running statement: %s" % s)
244 log.msg("Error running statement: %s" % s)
245
245
246 def start_app(self):
246 def start_app(self):
247 # Start the controller service and set things running
247 # Start the controller service.
248 self.main_service.startService()
248 self.main_service.startService()
249 # Write the .pid file overwriting old ones. This allow multiple
250 # controllers to clober each other. But Windows is not cleaning
251 # these up properly.
249 self.write_pid_file(overwrite=True)
252 self.write_pid_file(overwrite=True)
253 # cd to the cluster_dir as our working directory.
254 os.chdir(self.master_config.Global.cluster_dir)
255 # Add a trigger to delete the .pid file upon shutting down.
250 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
256 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
251 reactor.run()
257 reactor.run()
252
258
@@ -224,7 +224,8 b' class IPEngineApp(ApplicationWithClusterDir):'
224 log.msg("Error executing statement: %s" % line)
224 log.msg("Error executing statement: %s" % line)
225
225
226 def start_app(self):
226 def start_app(self):
227 # Start the controller service and set things running
227 # cd to the cluster_dir as our working directory.
228 os.chdir(self.master_config.Global.cluster_dir)
228 reactor.run()
229 reactor.run()
229
230
230
231
General Comments 0
You need to be logged in to leave comments. Login now