Show More
@@ -454,7 +454,8 b' class ApplicationWithClusterDir(Application):' | |||||
454 | """Remove the pid file. |
|
454 | """Remove the pid file. | |
455 |
|
455 | |||
456 | This should be called at shutdown by registering a callback with |
|
456 | This should be called at shutdown by registering a callback with | |
457 | :func:`reactor.addSystemEventTrigger`. |
|
457 | :func:`reactor.addSystemEventTrigger`. This needs to return | |
|
458 | ``None``. | |||
458 | """ |
|
459 | """ | |
459 | pid_file = os.path.join(self.pid_dir, self.name + u'.pid') |
|
460 | pid_file = os.path.join(self.pid_dir, self.name + u'.pid') | |
460 | if os.path.isfile(pid_file): |
|
461 | if os.path.isfile(pid_file): | |
@@ -463,7 +464,6 b' class ApplicationWithClusterDir(Application):' | |||||
463 | os.remove(pid_file) |
|
464 | os.remove(pid_file) | |
464 | except: |
|
465 | except: | |
465 | self.log.warn("Error removing the pid file: %s" % pid_file) |
|
466 | self.log.warn("Error removing the pid file: %s" % pid_file) | |
466 | raise |
|
|||
467 |
|
467 | |||
468 | def get_pid_from_file(self): |
|
468 | def get_pid_from_file(self): | |
469 | """Get the pid from the pid file. |
|
469 | """Get the pid from the pid file. |
@@ -32,8 +32,8 b' from IPython.kernel.clusterdir import (' | |||||
32 | ApplicationWithClusterDir, ClusterDirError, PIDFileError |
|
32 | ApplicationWithClusterDir, ClusterDirError, PIDFileError | |
33 | ) |
|
33 | ) | |
34 |
|
34 | |||
35 | from twisted.internet import reactor |
|
35 | from twisted.internet import reactor, defer | |
36 | from twisted.python import log |
|
36 | from twisted.python import log, failure | |
37 |
|
37 | |||
38 |
|
38 | |||
39 | #----------------------------------------------------------------------------- |
|
39 | #----------------------------------------------------------------------------- | |
@@ -283,31 +283,43 b' class IPClusterApp(ApplicationWithClusterDir):' | |||||
283 | ) |
|
283 | ) | |
284 |
|
284 | |||
285 | # Setup signals |
|
285 | # Setup signals | |
286 |
signal.signal(signal.SIGINT, self.s |
|
286 | signal.signal(signal.SIGINT, self.sigint_handler) | |
287 |
|
287 | |||
288 | # Setup the observing of stopping |
|
288 | # Setup the observing of stopping. If the controller dies, shut | |
|
289 | # everything down as that will be completely fatal for the engines. | |||
289 | d1 = self.controller_launcher.observe_stop() |
|
290 | d1 = self.controller_launcher.observe_stop() | |
290 |
d1.addCallback(self.stop_ |
|
291 | d1.addCallback(self.stop_launchers) | |
291 | d1.addErrback(self.err_and_stop) |
|
292 | # But, we don't monitor the stopping of engines. An engine dying | |
292 | # If this triggers, just let them die |
|
293 | # is just fine and in principle a user could start a new engine. | |
293 | # d2 = self.engine_launcher.observe_stop() |
|
294 | # Also, if we did monitor engine stopping, it is difficult to | |
|
295 | # know what to do when only some engines die. Currently, the | |||
|
296 | # observing of engine stopping is inconsistent. Some launchers | |||
|
297 | # might trigger on a single engine stopping, other wait until | |||
|
298 | # all stop. TODO: think more about how to handle this. | |||
294 |
|
299 | |||
295 | # Start the controller and engines |
|
300 | # Start the controller and engines | |
|
301 | self._stopping = False # Make sure stop_launchers is not called 2x. | |||
|
302 | d = self.start_controller() | |||
|
303 | d.addCallback(self.start_engines) | |||
|
304 | d.addCallback(self.startup_message) | |||
|
305 | # If the controller or engines fail to start, stop everything | |||
|
306 | d.addErrback(self.stop_launchers) | |||
|
307 | return d | |||
|
308 | ||||
|
309 | def startup_message(self, r=None): | |||
|
310 | log.msg("IPython cluster: started") | |||
|
311 | return r | |||
|
312 | ||||
|
313 | def start_controller(self, r=None): | |||
|
314 | # log.msg("In start_controller") | |||
|
315 | config = self.master_config | |||
296 | d = self.controller_launcher.start( |
|
316 | d = self.controller_launcher.start( | |
297 | cluster_dir=config.Global.cluster_dir |
|
317 | cluster_dir=config.Global.cluster_dir | |
298 | ) |
|
318 | ) | |
299 | d.addCallback(lambda _: self.start_engines()) |
|
319 | return d | |
300 | d.addErrback(self.err_and_stop) |
|
|||
301 |
|
||||
302 | def err_and_stop(self, f): |
|
|||
303 | log.msg('Unexpected error in ipcluster:') |
|
|||
304 | log.err(f) |
|
|||
305 | reactor.stop() |
|
|||
306 |
|
||||
307 | def stop_engines(self, r): |
|
|||
308 | return self.engine_launcher.stop() |
|
|||
309 |
|
320 | |||
310 | def start_engines(self): |
|
321 | def start_engines(self, r=None): | |
|
322 | # log.msg("In start_engines") | |||
311 | config = self.master_config |
|
323 | config = self.master_config | |
312 | d = self.engine_launcher.start( |
|
324 | d = self.engine_launcher.start( | |
313 | config.Global.n, |
|
325 | config.Global.n, | |
@@ -315,25 +327,55 b' class IPClusterApp(ApplicationWithClusterDir):' | |||||
315 | ) |
|
327 | ) | |
316 | return d |
|
328 | return d | |
317 |
|
329 | |||
318 |
def stop_l |
|
330 | def stop_controller(self, r=None): | |
319 |
log.msg(" |
|
331 | # log.msg("In stop_controller") | |
320 | d1 = self.engine_launcher.stop() |
|
332 | if self.controller_launcher.running: | |
321 |
d |
|
333 | d = self.controller_launcher.stop() | |
322 | # d1.addCallback(lambda _: self.controller_launcher.stop) |
|
334 | d.addErrback(self.log_err) | |
323 | d1.addErrback(self.err_and_stop) |
|
335 | return d | |
324 | d2.addErrback(self.err_and_stop) |
|
336 | else: | |
325 | reactor.callLater(2.0, reactor.stop) |
|
337 | return defer.succeed(None) | |
|
338 | ||||
|
339 | def stop_engines(self, r=None): | |||
|
340 | # log.msg("In stop_engines") | |||
|
341 | if self.engine_launcher.running: | |||
|
342 | d = self.engine_launcher.stop() | |||
|
343 | d.addErrback(self.log_err) | |||
|
344 | return d | |||
|
345 | else: | |||
|
346 | return defer.succeed(None) | |||
|
347 | ||||
|
348 | def log_err(self, f): | |||
|
349 | log.msg(f.getTraceback()) | |||
|
350 | return None | |||
|
351 | ||||
|
352 | def stop_launchers(self, r=None): | |||
|
353 | if not self._stopping: | |||
|
354 | self._stopping = True | |||
|
355 | if isinstance(r, failure.Failure): | |||
|
356 | log.msg('Unexpected error in ipcluster:') | |||
|
357 | log.msg(r.getTraceback()) | |||
|
358 | log.msg("IPython cluster: stopping") | |||
|
359 | d= self.stop_engines() | |||
|
360 | d2 = self.stop_controller() | |||
|
361 | # Wait a few seconds to let things shut down. | |||
|
362 | reactor.callLater(3.0, reactor.stop) | |||
|
363 | ||||
|
364 | def sigint_handler(self, signum, frame): | |||
|
365 | self.stop_launchers() | |||
326 |
|
366 | |||
327 | def start_logging(self): |
|
367 | def start_logging(self): | |
328 | # Remove old log files |
|
368 | # Remove old log files of the controller and engine | |
329 | if self.master_config.Global.clean_logs: |
|
369 | if self.master_config.Global.clean_logs: | |
330 | log_dir = self.master_config.Global.log_dir |
|
370 | log_dir = self.master_config.Global.log_dir | |
331 | for f in os.listdir(log_dir): |
|
371 | for f in os.listdir(log_dir): | |
332 |
if f.startswith('ipengine' + '-') |
|
372 | if f.startswith('ipengine' + '-'): | |
|
373 | if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): | |||
333 | os.remove(os.path.join(log_dir, f)) |
|
374 | os.remove(os.path.join(log_dir, f)) | |
334 | for f in os.listdir(log_dir): |
|
375 | if f.startswith('ipcontroller' + '-'): | |
335 | if f.startswith('ipcontroller' + '-') and f.endswith('.log'): |
|
376 | if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): | |
336 | os.remove(os.path.join(log_dir, f)) |
|
377 | os.remove(os.path.join(log_dir, f)) | |
|
378 | # This will remote old log files for ipcluster itself | |||
337 | super(IPClusterApp, self).start_logging() |
|
379 | super(IPClusterApp, self).start_logging() | |
338 |
|
380 | |||
339 | def start_app(self): |
|
381 | def start_app(self): |
@@ -252,8 +252,8 b' class IPControllerTask(WinHPCTask):' | |||||
252 | controller_cmd = List(['ipcontroller.exe'], config=True) |
|
252 | controller_cmd = List(['ipcontroller.exe'], config=True) | |
253 | controller_args = List(['--log-to-file', '--log-level', '40'], config=True) |
|
253 | controller_args = List(['--log-to-file', '--log-level', '40'], config=True) | |
254 | # I don't want these to be configurable |
|
254 | # I don't want these to be configurable | |
255 |
std_out_file_path = CStr( |
|
255 | std_out_file_path = CStr('', config=False) | |
256 |
std_err_file_path = CStr( |
|
256 | std_err_file_path = CStr('', config=False) | |
257 | min_cores = Int(1, config=False) |
|
257 | min_cores = Int(1, config=False) | |
258 | max_cores = Int(1, config=False) |
|
258 | max_cores = Int(1, config=False) | |
259 | min_sockets = Int(1, config=False) |
|
259 | min_sockets = Int(1, config=False) | |
@@ -263,6 +263,12 b' class IPControllerTask(WinHPCTask):' | |||||
263 | unit_type = Str("Core", config=False) |
|
263 | unit_type = Str("Core", config=False) | |
264 | work_directory = CStr('', config=False) |
|
264 | work_directory = CStr('', config=False) | |
265 |
|
265 | |||
|
266 | def __init__(self, parent, name=None, config=None): | |||
|
267 | super(IPControllerTask, self).__init__(parent, name, config) | |||
|
268 | the_uuid = uuid.uuid1() | |||
|
269 | self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid) | |||
|
270 | self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid) | |||
|
271 | ||||
266 | @property |
|
272 | @property | |
267 | def command_line(self): |
|
273 | def command_line(self): | |
268 | return ' '.join(self.controller_cmd + self.controller_args) |
|
274 | return ' '.join(self.controller_cmd + self.controller_args) | |
@@ -274,8 +280,8 b' class IPEngineTask(WinHPCTask):' | |||||
274 | engine_cmd = List(['ipengine.exe'], config=True) |
|
280 | engine_cmd = List(['ipengine.exe'], config=True) | |
275 | engine_args = List(['--log-to-file', '--log-level', '40'], config=True) |
|
281 | engine_args = List(['--log-to-file', '--log-level', '40'], config=True) | |
276 | # I don't want these to be configurable |
|
282 | # I don't want these to be configurable | |
277 |
std_out_file_path = CStr( |
|
283 | std_out_file_path = CStr('', config=False) | |
278 |
std_err_file_path = CStr( |
|
284 | std_err_file_path = CStr('', config=False) | |
279 | min_cores = Int(1, config=False) |
|
285 | min_cores = Int(1, config=False) | |
280 | max_cores = Int(1, config=False) |
|
286 | max_cores = Int(1, config=False) | |
281 | min_sockets = Int(1, config=False) |
|
287 | min_sockets = Int(1, config=False) | |
@@ -285,6 +291,12 b' class IPEngineTask(WinHPCTask):' | |||||
285 | unit_type = Str("Core", config=False) |
|
291 | unit_type = Str("Core", config=False) | |
286 | work_directory = CStr('', config=False) |
|
292 | work_directory = CStr('', config=False) | |
287 |
|
293 | |||
|
294 | def __init__(self, parent, name=None, config=None): | |||
|
295 | super(IPEngineTask,self).__init__(parent, name, config) | |||
|
296 | the_uuid = uuid.uuid1() | |||
|
297 | self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid) | |||
|
298 | self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid) | |||
|
299 | ||||
288 | @property |
|
300 | @property | |
289 | def command_line(self): |
|
301 | def command_line(self): | |
290 | return ' '.join(self.engine_cmd + self.engine_args) |
|
302 | return ' '.join(self.engine_cmd + self.engine_args) |
General Comments 0
You need to be logged in to leave comments.
Login now