##// END OF EJS Templates
Reworking how controller and engines startup in ipcluster....
bgranger -
Show More
@@ -454,7 +454,8 b' class ApplicationWithClusterDir(Application):'
454 454 """Remove the pid file.
455 455
456 456 This should be called at shutdown by registering a callback with
457 :func:`reactor.addSystemEventTrigger`.
457 :func:`reactor.addSystemEventTrigger`. This needs to return
458 ``None``.
458 459 """
459 460 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
460 461 if os.path.isfile(pid_file):
@@ -463,7 +464,6 b' class ApplicationWithClusterDir(Application):'
463 464 os.remove(pid_file)
464 465 except:
465 466 self.log.warn("Error removing the pid file: %s" % pid_file)
466 raise
467 467
468 468 def get_pid_from_file(self):
469 469 """Get the pid from the pid file.
@@ -32,8 +32,8 b' from IPython.kernel.clusterdir import ('
32 32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 33 )
34 34
35 from twisted.internet import reactor
36 from twisted.python import log
35 from twisted.internet import reactor, defer
36 from twisted.python import log, failure
37 37
38 38
39 39 #-----------------------------------------------------------------------------
@@ -283,31 +283,43 b' class IPClusterApp(ApplicationWithClusterDir):'
283 283 )
284 284
285 285 # Setup signals
286 signal.signal(signal.SIGINT, self.stop_launchers)
286 signal.signal(signal.SIGINT, self.sigint_handler)
287 287
288 # Setup the observing of stopping
288 # Setup the observing of stopping. If the controller dies, shut
289 # everything down as that will be completely fatal for the engines.
289 290 d1 = self.controller_launcher.observe_stop()
290 d1.addCallback(self.stop_engines)
291 d1.addErrback(self.err_and_stop)
292 # If this triggers, just let them die
293 # d2 = self.engine_launcher.observe_stop()
291 d1.addCallback(self.stop_launchers)
292 # But, we don't monitor the stopping of engines. An engine dying
293 # is just fine and in principle a user could start a new engine.
294 # Also, if we did monitor engine stopping, it is difficult to
295 # know what to do when only some engines die. Currently, the
296 # observing of engine stopping is inconsistent. Some launchers
297 # might trigger on a single engine stopping, other wait until
298 # all stop. TODO: think more about how to handle this.
294 299
295 300 # Start the controller and engines
301 self._stopping = False # Make sure stop_launchers is not called 2x.
302 d = self.start_controller()
303 d.addCallback(self.start_engines)
304 d.addCallback(self.startup_message)
305 # If the controller or engines fail to start, stop everything
306 d.addErrback(self.stop_launchers)
307 return d
308
309 def startup_message(self, r=None):
310 log.msg("IPython cluster: started")
311 return r
312
313 def start_controller(self, r=None):
314 # log.msg("In start_controller")
315 config = self.master_config
296 316 d = self.controller_launcher.start(
297 317 cluster_dir=config.Global.cluster_dir
298 318 )
299 d.addCallback(lambda _: self.start_engines())
300 d.addErrback(self.err_and_stop)
301
302 def err_and_stop(self, f):
303 log.msg('Unexpected error in ipcluster:')
304 log.err(f)
305 reactor.stop()
306
307 def stop_engines(self, r):
308 return self.engine_launcher.stop()
309
310 def start_engines(self):
319 return d
320
321 def start_engines(self, r=None):
322 # log.msg("In start_engines")
311 323 config = self.master_config
312 324 d = self.engine_launcher.start(
313 325 config.Global.n,
@@ -315,25 +327,55 b' class IPClusterApp(ApplicationWithClusterDir):'
315 327 )
316 328 return d
317 329
318 def stop_launchers(self, signum, frame):
319 log.msg("Stopping cluster")
320 d1 = self.engine_launcher.stop()
321 d2 = self.controller_launcher.stop()
322 # d1.addCallback(lambda _: self.controller_launcher.stop)
323 d1.addErrback(self.err_and_stop)
324 d2.addErrback(self.err_and_stop)
325 reactor.callLater(2.0, reactor.stop)
330 def stop_controller(self, r=None):
331 # log.msg("In stop_controller")
332 if self.controller_launcher.running:
333 d = self.controller_launcher.stop()
334 d.addErrback(self.log_err)
335 return d
336 else:
337 return defer.succeed(None)
338
339 def stop_engines(self, r=None):
340 # log.msg("In stop_engines")
341 if self.engine_launcher.running:
342 d = self.engine_launcher.stop()
343 d.addErrback(self.log_err)
344 return d
345 else:
346 return defer.succeed(None)
326 347
348 def log_err(self, f):
349 log.msg(f.getTraceback())
350 return None
351
352 def stop_launchers(self, r=None):
353 if not self._stopping:
354 self._stopping = True
355 if isinstance(r, failure.Failure):
356 log.msg('Unexpected error in ipcluster:')
357 log.msg(r.getTraceback())
358 log.msg("IPython cluster: stopping")
359 d= self.stop_engines()
360 d2 = self.stop_controller()
361 # Wait a few seconds to let things shut down.
362 reactor.callLater(3.0, reactor.stop)
363
364 def sigint_handler(self, signum, frame):
365 self.stop_launchers()
366
327 367 def start_logging(self):
328 # Remove old log files
368 # Remove old log files of the controller and engine
329 369 if self.master_config.Global.clean_logs:
330 370 log_dir = self.master_config.Global.log_dir
331 371 for f in os.listdir(log_dir):
332 if f.startswith('ipengine' + '-') and f.endswith('.log'):
333 os.remove(os.path.join(log_dir, f))
334 for f in os.listdir(log_dir):
335 if f.startswith('ipcontroller' + '-') and f.endswith('.log'):
336 os.remove(os.path.join(log_dir, f))
372 if f.startswith('ipengine' + '-'):
373 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
374 os.remove(os.path.join(log_dir, f))
375 if f.startswith('ipcontroller' + '-'):
376 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
377 os.remove(os.path.join(log_dir, f))
378 # This will remote old log files for ipcluster itself
337 379 super(IPClusterApp, self).start_logging()
338 380
339 381 def start_app(self):
@@ -252,8 +252,8 b' class IPControllerTask(WinHPCTask):'
252 252 controller_cmd = List(['ipcontroller.exe'], config=True)
253 253 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
254 254 # I don't want these to be configurable
255 std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False)
256 std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False)
255 std_out_file_path = CStr('', config=False)
256 std_err_file_path = CStr('', config=False)
257 257 min_cores = Int(1, config=False)
258 258 max_cores = Int(1, config=False)
259 259 min_sockets = Int(1, config=False)
@@ -263,6 +263,12 b' class IPControllerTask(WinHPCTask):'
263 263 unit_type = Str("Core", config=False)
264 264 work_directory = CStr('', config=False)
265 265
266 def __init__(self, parent, name=None, config=None):
267 super(IPControllerTask, self).__init__(parent, name, config)
268 the_uuid = uuid.uuid1()
269 self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid)
270 self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid)
271
266 272 @property
267 273 def command_line(self):
268 274 return ' '.join(self.controller_cmd + self.controller_args)
@@ -274,8 +280,8 b' class IPEngineTask(WinHPCTask):'
274 280 engine_cmd = List(['ipengine.exe'], config=True)
275 281 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
276 282 # I don't want these to be configurable
277 std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False)
278 std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False)
283 std_out_file_path = CStr('', config=False)
284 std_err_file_path = CStr('', config=False)
279 285 min_cores = Int(1, config=False)
280 286 max_cores = Int(1, config=False)
281 287 min_sockets = Int(1, config=False)
@@ -285,6 +291,12 b' class IPEngineTask(WinHPCTask):'
285 291 unit_type = Str("Core", config=False)
286 292 work_directory = CStr('', config=False)
287 293
294 def __init__(self, parent, name=None, config=None):
295 super(IPEngineTask,self).__init__(parent, name, config)
296 the_uuid = uuid.uuid1()
297 self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid)
298 self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid)
299
288 300 @property
289 301 def command_line(self):
290 302 return ' '.join(self.engine_cmd + self.engine_args)
General Comments 0
You need to be logged in to leave comments. Login now