##// END OF EJS Templates
Reworking how controller and engines startup in ipcluster....
bgranger -
Show More
@@ -454,7 +454,8 b' class ApplicationWithClusterDir(Application):'
454 """Remove the pid file.
454 """Remove the pid file.
455
455
456 This should be called at shutdown by registering a callback with
456 This should be called at shutdown by registering a callback with
457 :func:`reactor.addSystemEventTrigger`.
457 :func:`reactor.addSystemEventTrigger`. This needs to return
458 ``None``.
458 """
459 """
459 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
460 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
460 if os.path.isfile(pid_file):
461 if os.path.isfile(pid_file):
@@ -463,7 +464,6 b' class ApplicationWithClusterDir(Application):'
463 os.remove(pid_file)
464 os.remove(pid_file)
464 except:
465 except:
465 self.log.warn("Error removing the pid file: %s" % pid_file)
466 self.log.warn("Error removing the pid file: %s" % pid_file)
466 raise
467
467
468 def get_pid_from_file(self):
468 def get_pid_from_file(self):
469 """Get the pid from the pid file.
469 """Get the pid from the pid file.
@@ -32,8 +32,8 b' from IPython.kernel.clusterdir import ('
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 )
33 )
34
34
35 from twisted.internet import reactor
35 from twisted.internet import reactor, defer
36 from twisted.python import log
36 from twisted.python import log, failure
37
37
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
@@ -283,31 +283,43 b' class IPClusterApp(ApplicationWithClusterDir):'
283 )
283 )
284
284
285 # Setup signals
285 # Setup signals
286 signal.signal(signal.SIGINT, self.stop_launchers)
286 signal.signal(signal.SIGINT, self.sigint_handler)
287
287
288 # Setup the observing of stopping
288 # Setup the observing of stopping. If the controller dies, shut
289 # everything down as that will be completely fatal for the engines.
289 d1 = self.controller_launcher.observe_stop()
290 d1 = self.controller_launcher.observe_stop()
290 d1.addCallback(self.stop_engines)
291 d1.addCallback(self.stop_launchers)
291 d1.addErrback(self.err_and_stop)
292 # But, we don't monitor the stopping of engines. An engine dying
292 # If this triggers, just let them die
293 # is just fine and in principle a user could start a new engine.
293 # d2 = self.engine_launcher.observe_stop()
294 # Also, if we did monitor engine stopping, it is difficult to
295 # know what to do when only some engines die. Currently, the
296 # observing of engine stopping is inconsistent. Some launchers
297 # might trigger on a single engine stopping, other wait until
298 # all stop. TODO: think more about how to handle this.
294
299
295 # Start the controller and engines
300 # Start the controller and engines
301 self._stopping = False # Make sure stop_launchers is not called 2x.
302 d = self.start_controller()
303 d.addCallback(self.start_engines)
304 d.addCallback(self.startup_message)
305 # If the controller or engines fail to start, stop everything
306 d.addErrback(self.stop_launchers)
307 return d
308
309 def startup_message(self, r=None):
310 log.msg("IPython cluster: started")
311 return r
312
313 def start_controller(self, r=None):
314 # log.msg("In start_controller")
315 config = self.master_config
296 d = self.controller_launcher.start(
316 d = self.controller_launcher.start(
297 cluster_dir=config.Global.cluster_dir
317 cluster_dir=config.Global.cluster_dir
298 )
318 )
299 d.addCallback(lambda _: self.start_engines())
319 return d
300 d.addErrback(self.err_and_stop)
301
302 def err_and_stop(self, f):
303 log.msg('Unexpected error in ipcluster:')
304 log.err(f)
305 reactor.stop()
306
307 def stop_engines(self, r):
308 return self.engine_launcher.stop()
309
320
310 def start_engines(self):
321 def start_engines(self, r=None):
322 # log.msg("In start_engines")
311 config = self.master_config
323 config = self.master_config
312 d = self.engine_launcher.start(
324 d = self.engine_launcher.start(
313 config.Global.n,
325 config.Global.n,
@@ -315,25 +327,55 b' class IPClusterApp(ApplicationWithClusterDir):'
315 )
327 )
316 return d
328 return d
317
329
318 def stop_launchers(self, signum, frame):
330 def stop_controller(self, r=None):
319 log.msg("Stopping cluster")
331 # log.msg("In stop_controller")
320 d1 = self.engine_launcher.stop()
332 if self.controller_launcher.running:
321 d2 = self.controller_launcher.stop()
333 d = self.controller_launcher.stop()
322 # d1.addCallback(lambda _: self.controller_launcher.stop)
334 d.addErrback(self.log_err)
323 d1.addErrback(self.err_and_stop)
335 return d
324 d2.addErrback(self.err_and_stop)
336 else:
325 reactor.callLater(2.0, reactor.stop)
337 return defer.succeed(None)
338
339 def stop_engines(self, r=None):
340 # log.msg("In stop_engines")
341 if self.engine_launcher.running:
342 d = self.engine_launcher.stop()
343 d.addErrback(self.log_err)
344 return d
345 else:
346 return defer.succeed(None)
347
348 def log_err(self, f):
349 log.msg(f.getTraceback())
350 return None
351
352 def stop_launchers(self, r=None):
353 if not self._stopping:
354 self._stopping = True
355 if isinstance(r, failure.Failure):
356 log.msg('Unexpected error in ipcluster:')
357 log.msg(r.getTraceback())
358 log.msg("IPython cluster: stopping")
359 d= self.stop_engines()
360 d2 = self.stop_controller()
361 # Wait a few seconds to let things shut down.
362 reactor.callLater(3.0, reactor.stop)
363
364 def sigint_handler(self, signum, frame):
365 self.stop_launchers()
326
366
327 def start_logging(self):
367 def start_logging(self):
328 # Remove old log files
368 # Remove old log files of the controller and engine
329 if self.master_config.Global.clean_logs:
369 if self.master_config.Global.clean_logs:
330 log_dir = self.master_config.Global.log_dir
370 log_dir = self.master_config.Global.log_dir
331 for f in os.listdir(log_dir):
371 for f in os.listdir(log_dir):
332 if f.startswith('ipengine' + '-') and f.endswith('.log'):
372 if f.startswith('ipengine' + '-'):
373 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
333 os.remove(os.path.join(log_dir, f))
374 os.remove(os.path.join(log_dir, f))
334 for f in os.listdir(log_dir):
375 if f.startswith('ipcontroller' + '-'):
335 if f.startswith('ipcontroller' + '-') and f.endswith('.log'):
376 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
336 os.remove(os.path.join(log_dir, f))
377 os.remove(os.path.join(log_dir, f))
378 # This will remote old log files for ipcluster itself
337 super(IPClusterApp, self).start_logging()
379 super(IPClusterApp, self).start_logging()
338
380
339 def start_app(self):
381 def start_app(self):
@@ -252,8 +252,8 b' class IPControllerTask(WinHPCTask):'
252 controller_cmd = List(['ipcontroller.exe'], config=True)
252 controller_cmd = List(['ipcontroller.exe'], config=True)
253 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
253 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
254 # I don't want these to be configurable
254 # I don't want these to be configurable
255 std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False)
255 std_out_file_path = CStr('', config=False)
256 std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False)
256 std_err_file_path = CStr('', config=False)
257 min_cores = Int(1, config=False)
257 min_cores = Int(1, config=False)
258 max_cores = Int(1, config=False)
258 max_cores = Int(1, config=False)
259 min_sockets = Int(1, config=False)
259 min_sockets = Int(1, config=False)
@@ -263,6 +263,12 b' class IPControllerTask(WinHPCTask):'
263 unit_type = Str("Core", config=False)
263 unit_type = Str("Core", config=False)
264 work_directory = CStr('', config=False)
264 work_directory = CStr('', config=False)
265
265
266 def __init__(self, parent, name=None, config=None):
267 super(IPControllerTask, self).__init__(parent, name, config)
268 the_uuid = uuid.uuid1()
269 self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid)
270 self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid)
271
266 @property
272 @property
267 def command_line(self):
273 def command_line(self):
268 return ' '.join(self.controller_cmd + self.controller_args)
274 return ' '.join(self.controller_cmd + self.controller_args)
@@ -274,8 +280,8 b' class IPEngineTask(WinHPCTask):'
274 engine_cmd = List(['ipengine.exe'], config=True)
280 engine_cmd = List(['ipengine.exe'], config=True)
275 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
281 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
276 # I don't want these to be configurable
282 # I don't want these to be configurable
277 std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False)
283 std_out_file_path = CStr('', config=False)
278 std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False)
284 std_err_file_path = CStr('', config=False)
279 min_cores = Int(1, config=False)
285 min_cores = Int(1, config=False)
280 max_cores = Int(1, config=False)
286 max_cores = Int(1, config=False)
281 min_sockets = Int(1, config=False)
287 min_sockets = Int(1, config=False)
@@ -285,6 +291,12 b' class IPEngineTask(WinHPCTask):'
285 unit_type = Str("Core", config=False)
291 unit_type = Str("Core", config=False)
286 work_directory = CStr('', config=False)
292 work_directory = CStr('', config=False)
287
293
294 def __init__(self, parent, name=None, config=None):
295 super(IPEngineTask,self).__init__(parent, name, config)
296 the_uuid = uuid.uuid1()
297 self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid)
298 self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid)
299
288 @property
300 @property
289 def command_line(self):
301 def command_line(self):
290 return ' '.join(self.engine_cmd + self.engine_args)
302 return ' '.join(self.engine_cmd + self.engine_args)
General Comments 0
You need to be logged in to leave comments. Login now