Show More
@@ -282,9 +282,11 class IPClusterEngines(BaseParallelApplication): | |||||
282 | if new: |
|
282 | if new: | |
283 | self.log_to_file = True |
|
283 | self.log_to_file = True | |
284 |
|
284 | |||
|
285 | early_shutdown = Int(30, config=True, help="The timeout (in seconds)") | |||
|
286 | _stopping = False | |||
|
287 | ||||
285 | aliases = Dict(engine_aliases) |
|
288 | aliases = Dict(engine_aliases) | |
286 | flags = Dict(engine_flags) |
|
289 | flags = Dict(engine_flags) | |
287 | _stopping = False |
|
|||
288 |
|
290 | |||
289 | def initialize(self, argv=None): |
|
291 | def initialize(self, argv=None): | |
290 | super(IPClusterEngines, self).initialize(argv) |
|
292 | super(IPClusterEngines, self).initialize(argv) | |
@@ -293,7 +295,6 class IPClusterEngines(BaseParallelApplication): | |||||
293 |
|
295 | |||
294 | def init_launchers(self): |
|
296 | def init_launchers(self): | |
295 | self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') |
|
297 | self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') | |
296 | self.engine_launcher.on_stop(lambda r: self.loop.stop()) |
|
|||
297 |
|
298 | |||
298 | def init_signal(self): |
|
299 | def init_signal(self): | |
299 | # Setup signals |
|
300 | # Setup signals | |
@@ -320,13 +321,42 class IPClusterEngines(BaseParallelApplication): | |||||
320 | ) |
|
321 | ) | |
321 | return launcher |
|
322 | return launcher | |
322 |
|
323 | |||
|
324 | def engines_started_okay(self): | |||
|
325 | self.log.info("Engines appear to have started successfully") | |||
|
326 | self.early_shutdown = 0 | |||
|
327 | ||||
323 | def start_engines(self): |
|
328 | def start_engines(self): | |
324 | self.log.info("Starting %i engines"%self.n) |
|
329 | self.log.info("Starting %i engines"%self.n) | |
325 | self.engine_launcher.start(self.n) |
|
330 | self.engine_launcher.start(self.n) | |
|
331 | self.engine_launcher.on_stop(self.engines_stopped_early) | |||
|
332 | if self.early_shutdown: | |||
|
333 | ioloop.DelayedCallback(self.engines_started_okay, self.early_shutdown*1000, self.loop).start() | |||
|
334 | ||||
|
335 | def engines_stopped_early(self, r): | |||
|
336 | if self.early_shutdown and not self._stopping: | |||
|
337 | self.log.error(""" | |||
|
338 | Engines shutdown early, they probably failed to connect. | |||
|
339 | ||||
|
340 | Check the engine log files for output. | |||
|
341 | ||||
|
342 | If your controller and engines are not on the same machine, you probably | |||
|
343 | have to instruct the controller to listen on an interface other than localhost. | |||
|
344 | ||||
|
345 | You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args. | |||
|
346 | ||||
|
347 | Be sure to read our security docs before instructing your controller to listen on | |||
|
348 | a public interface. | |||
|
349 | """) | |||
|
350 | self.stop_launchers() | |||
|
351 | ||||
|
352 | return self.engines_stopped(r) | |||
|
353 | ||||
|
354 | def engines_stopped(self, r): | |||
|
355 | return self.loop.stop() | |||
326 |
|
356 | |||
327 | def stop_engines(self): |
|
357 | def stop_engines(self): | |
328 | self.log.info("Stopping Engines...") |
|
|||
329 | if self.engine_launcher.running: |
|
358 | if self.engine_launcher.running: | |
|
359 | self.log.info("Stopping Engines...") | |||
330 | d = self.engine_launcher.stop() |
|
360 | d = self.engine_launcher.stop() | |
331 | return d |
|
361 | return d | |
332 | else: |
|
362 | else: | |
@@ -338,7 +368,7 class IPClusterEngines(BaseParallelApplication): | |||||
338 | self.log.error("IPython cluster: stopping") |
|
368 | self.log.error("IPython cluster: stopping") | |
339 | self.stop_engines() |
|
369 | self.stop_engines() | |
340 | # Wait a few seconds to let things shut down. |
|
370 | # Wait a few seconds to let things shut down. | |
341 |
dc = ioloop.DelayedCallback(self.loop.stop, |
|
371 | dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop) | |
342 | dc.start() |
|
372 | dc.start() | |
343 |
|
373 | |||
344 | def sigint_handler(self, signum, frame): |
|
374 | def sigint_handler(self, signum, frame): | |
@@ -458,6 +488,10 class IPClusterStart(IPClusterEngines): | |||||
458 | self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') |
|
488 | self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') | |
459 | self.controller_launcher.on_stop(self.stop_launchers) |
|
489 | self.controller_launcher.on_stop(self.stop_launchers) | |
460 |
|
490 | |||
|
491 | def engines_stopped(self, r): | |||
|
492 | """prevent parent.engines_stopped from stopping everything on engine shutdown""" | |||
|
493 | pass | |||
|
494 | ||||
461 | def start_controller(self): |
|
495 | def start_controller(self): | |
462 | self.controller_launcher.start() |
|
496 | self.controller_launcher.start() | |
463 |
|
497 |
@@ -214,6 +214,14 class EngineFactory(RegistrationFactory): | |||||
214 |
|
214 | |||
215 | def abort(self): |
|
215 | def abort(self): | |
216 | self.log.fatal("Registration timed out after %.1f seconds"%self.timeout) |
|
216 | self.log.fatal("Registration timed out after %.1f seconds"%self.timeout) | |
|
217 | if '127' in self.url: | |||
|
218 | self.log.fatal(""" | |||
|
219 | If the controller and engines are not on the same machine, | |||
|
220 | you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py): | |||
|
221 | c.HubFactory.ip='*' # for all interfaces, internal and external | |||
|
222 | c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see | |||
|
223 | or tunnel connections via ssh. | |||
|
224 | """) | |||
217 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) |
|
225 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) | |
218 | time.sleep(1) |
|
226 | time.sleep(1) | |
219 | sys.exit(255) |
|
227 | sys.exit(255) |
General Comments 0
You need to be logged in to leave comments.
Login now