diff --git a/IPython/parallel/apps/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py index cca9eed..22bc5ec 100755 --- a/IPython/parallel/apps/ipclusterapp.py +++ b/IPython/parallel/apps/ipclusterapp.py @@ -282,9 +282,11 @@ class IPClusterEngines(BaseParallelApplication): if new: self.log_to_file = True + early_shutdown = Int(30, config=True, help="The timeout (in seconds)") + _stopping = False + aliases = Dict(engine_aliases) flags = Dict(engine_flags) - _stopping = False def initialize(self, argv=None): super(IPClusterEngines, self).initialize(argv) @@ -293,7 +295,6 @@ class IPClusterEngines(BaseParallelApplication): def init_launchers(self): self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') - self.engine_launcher.on_stop(lambda r: self.loop.stop()) def init_signal(self): # Setup signals @@ -320,13 +321,42 @@ class IPClusterEngines(BaseParallelApplication): ) return launcher + def engines_started_okay(self): + self.log.info("Engines appear to have started successfully") + self.early_shutdown = 0 + def start_engines(self): self.log.info("Starting %i engines"%self.n) self.engine_launcher.start(self.n) + self.engine_launcher.on_stop(self.engines_stopped_early) + if self.early_shutdown: + ioloop.DelayedCallback(self.engines_started_okay, self.early_shutdown*1000, self.loop).start() + + def engines_stopped_early(self, r): + if self.early_shutdown and not self._stopping: + self.log.error(""" + Engines shutdown early, they probably failed to connect. + + Check the engine log files for output. + + If your controller and engines are not on the same machine, you probably + have to instruct the controller to listen on an interface other than localhost. + + You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args. + + Be sure to read our security docs before instructing your controller to listen on + a public interface. + """) + self.stop_launchers() + + return self.engines_stopped(r) + + def engines_stopped(self, r): + return self.loop.stop() def stop_engines(self): - self.log.info("Stopping Engines...") if self.engine_launcher.running: + self.log.info("Stopping Engines...") d = self.engine_launcher.stop() return d else: @@ -338,7 +368,7 @@ class IPClusterEngines(BaseParallelApplication): self.log.error("IPython cluster: stopping") self.stop_engines() # Wait a few seconds to let things shut down. - dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop) + dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop) dc.start() def sigint_handler(self, signum, frame): @@ -457,7 +487,11 @@ class IPClusterStart(IPClusterEngines): self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller') self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') self.controller_launcher.on_stop(self.stop_launchers) - + + def engines_stopped(self, r): + """prevent parent.engines_stopped from stopping everything on engine shutdown""" + pass + def start_controller(self): self.controller_launcher.start() diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 0a14f23..afd55b7 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -214,6 +214,14 @@ class EngineFactory(RegistrationFactory): def abort(self): self.log.fatal("Registration timed out after %.1f seconds"%self.timeout) + if '127' in self.url: + self.log.fatal(""" + If the controller and engines are not on the same machine, + you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py): + c.HubFactory.ip='*' # for all interfaces, internal and external + c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see + or tunnel connections via ssh. + """) self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) time.sleep(1) sys.exit(255)