diff --git a/IPython/kernel/clusterdir.py b/IPython/kernel/clusterdir.py index 8b4ace4..356cdf6 100644 --- a/IPython/kernel/clusterdir.py +++ b/IPython/kernel/clusterdir.py @@ -454,7 +454,8 @@ class ApplicationWithClusterDir(Application): """Remove the pid file. This should be called at shutdown by registering a callback with - :func:`reactor.addSystemEventTrigger`. + :func:`reactor.addSystemEventTrigger`. This needs to return + ``None``. """ pid_file = os.path.join(self.pid_dir, self.name + u'.pid') if os.path.isfile(pid_file): @@ -463,7 +464,6 @@ class ApplicationWithClusterDir(Application): os.remove(pid_file) except: self.log.warn("Error removing the pid file: %s" % pid_file) - raise def get_pid_from_file(self): """Get the pid from the pid file. diff --git a/IPython/kernel/ipclusterapp.py b/IPython/kernel/ipclusterapp.py index 306af2f..783833e 100644 --- a/IPython/kernel/ipclusterapp.py +++ b/IPython/kernel/ipclusterapp.py @@ -32,8 +32,8 @@ from IPython.kernel.clusterdir import ( ApplicationWithClusterDir, ClusterDirError, PIDFileError ) -from twisted.internet import reactor -from twisted.python import log +from twisted.internet import reactor, defer +from twisted.python import log, failure #----------------------------------------------------------------------------- @@ -283,31 +283,43 @@ class IPClusterApp(ApplicationWithClusterDir): ) # Setup signals - signal.signal(signal.SIGINT, self.stop_launchers) + signal.signal(signal.SIGINT, self.sigint_handler) - # Setup the observing of stopping + # Setup the observing of stopping. If the controller dies, shut + # everything down as that will be completely fatal for the engines. d1 = self.controller_launcher.observe_stop() - d1.addCallback(self.stop_engines) - d1.addErrback(self.err_and_stop) - # If this triggers, just let them die - # d2 = self.engine_launcher.observe_stop() + d1.addCallback(self.stop_launchers) + # But, we don't monitor the stopping of engines. An engine dying + # is just fine and in principle a user could start a new engine. + # Also, if we did monitor engine stopping, it is difficult to + # know what to do when only some engines die. Currently, the + # observing of engine stopping is inconsistent. Some launchers + # might trigger on a single engine stopping, other wait until + # all stop. TODO: think more about how to handle this. # Start the controller and engines + self._stopping = False # Make sure stop_launchers is not called 2x. + d = self.start_controller() + d.addCallback(self.start_engines) + d.addCallback(self.startup_message) + # If the controller or engines fail to start, stop everything + d.addErrback(self.stop_launchers) + return d + + def startup_message(self, r=None): + log.msg("IPython cluster: started") + return r + + def start_controller(self, r=None): + # log.msg("In start_controller") + config = self.master_config d = self.controller_launcher.start( cluster_dir=config.Global.cluster_dir ) - d.addCallback(lambda _: self.start_engines()) - d.addErrback(self.err_and_stop) - - def err_and_stop(self, f): - log.msg('Unexpected error in ipcluster:') - log.err(f) - reactor.stop() - - def stop_engines(self, r): - return self.engine_launcher.stop() - - def start_engines(self): + return d + + def start_engines(self, r=None): + # log.msg("In start_engines") config = self.master_config d = self.engine_launcher.start( config.Global.n, @@ -315,25 +327,55 @@ class IPClusterApp(ApplicationWithClusterDir): ) return d - def stop_launchers(self, signum, frame): - log.msg("Stopping cluster") - d1 = self.engine_launcher.stop() - d2 = self.controller_launcher.stop() - # d1.addCallback(lambda _: self.controller_launcher.stop) - d1.addErrback(self.err_and_stop) - d2.addErrback(self.err_and_stop) - reactor.callLater(2.0, reactor.stop) + def stop_controller(self, r=None): + # log.msg("In stop_controller") + if self.controller_launcher.running: + d = self.controller_launcher.stop() + d.addErrback(self.log_err) + return d + else: + return defer.succeed(None) + + def stop_engines(self, r=None): + # log.msg("In stop_engines") + if self.engine_launcher.running: + d = self.engine_launcher.stop() + d.addErrback(self.log_err) + return d + else: + return defer.succeed(None) + def log_err(self, f): + log.msg(f.getTraceback()) + return None + + def stop_launchers(self, r=None): + if not self._stopping: + self._stopping = True + if isinstance(r, failure.Failure): + log.msg('Unexpected error in ipcluster:') + log.msg(r.getTraceback()) + log.msg("IPython cluster: stopping") + d= self.stop_engines() + d2 = self.stop_controller() + # Wait a few seconds to let things shut down. + reactor.callLater(3.0, reactor.stop) + + def sigint_handler(self, signum, frame): + self.stop_launchers() + def start_logging(self): - # Remove old log files + # Remove old log files of the controller and engine if self.master_config.Global.clean_logs: log_dir = self.master_config.Global.log_dir for f in os.listdir(log_dir): - if f.startswith('ipengine' + '-') and f.endswith('.log'): - os.remove(os.path.join(log_dir, f)) - for f in os.listdir(log_dir): - if f.startswith('ipcontroller' + '-') and f.endswith('.log'): - os.remove(os.path.join(log_dir, f)) + if f.startswith('ipengine' + '-'): + if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): + os.remove(os.path.join(log_dir, f)) + if f.startswith('ipcontroller' + '-'): + if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): + os.remove(os.path.join(log_dir, f)) + # This will remote old log files for ipcluster itself super(IPClusterApp, self).start_logging() def start_app(self): diff --git a/IPython/kernel/winhpcjob.py b/IPython/kernel/winhpcjob.py index 0b791fd..de1680d 100644 --- a/IPython/kernel/winhpcjob.py +++ b/IPython/kernel/winhpcjob.py @@ -252,8 +252,8 @@ class IPControllerTask(WinHPCTask): controller_cmd = List(['ipcontroller.exe'], config=True) controller_args = List(['--log-to-file', '--log-level', '40'], config=True) # I don't want these to be configurable - std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False) - std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False) + std_out_file_path = CStr('', config=False) + std_err_file_path = CStr('', config=False) min_cores = Int(1, config=False) max_cores = Int(1, config=False) min_sockets = Int(1, config=False) @@ -263,6 +263,12 @@ class IPControllerTask(WinHPCTask): unit_type = Str("Core", config=False) work_directory = CStr('', config=False) + def __init__(self, parent, name=None, config=None): + super(IPControllerTask, self).__init__(parent, name, config) + the_uuid = uuid.uuid1() + self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid) + self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid) + @property def command_line(self): return ' '.join(self.controller_cmd + self.controller_args) @@ -274,8 +280,8 @@ class IPEngineTask(WinHPCTask): engine_cmd = List(['ipengine.exe'], config=True) engine_args = List(['--log-to-file', '--log-level', '40'], config=True) # I don't want these to be configurable - std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False) - std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False) + std_out_file_path = CStr('', config=False) + std_err_file_path = CStr('', config=False) min_cores = Int(1, config=False) max_cores = Int(1, config=False) min_sockets = Int(1, config=False) @@ -285,6 +291,12 @@ class IPEngineTask(WinHPCTask): unit_type = Str("Core", config=False) work_directory = CStr('', config=False) + def __init__(self, parent, name=None, config=None): + super(IPEngineTask,self).__init__(parent, name, config) + the_uuid = uuid.uuid1() + self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid) + self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid) + @property def command_line(self): return ' '.join(self.engine_cmd + self.engine_args)