##// END OF EJS Templates
Notebook cluster manager now uses proper launchers.
Notebook cluster manager now uses proper launchers.

File last commit:

r6199:fcf2e872
r6199:fcf2e872
Show More
clustermanager.py
168 lines | 6.0 KiB | text/x-python | PythonLexer
"""Manage IPython.parallel clusters in the notebook.
Authors:
* Brian Granger
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import os
from tornado import web
from zmq.eventloop import ioloop
from IPython.config.configurable import LoggingConfigurable
from IPython.config.loader import load_pyconfig_files
from IPython.utils.traitlets import Dict, Instance, CFloat
from IPython.parallel.apps.ipclusterapp import find_launcher_class
from IPython.core.profileapp import list_profiles_in
from IPython.core.profiledir import ProfileDir
from IPython.utils.path import get_ipython_dir
from IPython.utils.sysinfo import num_cpus
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
class ClusterManager(LoggingConfigurable):
profiles = Dict()
delay = CFloat(1., config=True,
help="delay (in s) between starting the controller and the engines")
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
from zmq.eventloop.ioloop import IOLoop
return IOLoop.instance()
def load_cluster_config(self, profile_dir):
config_files = ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
config = load_pyconfig_files(config_files, profile_dir)
return config
def build_launchers(self, profile_dir):
config = self.load_cluster_config(profile_dir)
cont_clsname = config.IPClusterStart.get('controller_launcher_class','Local')
cont_class = find_launcher_class(cont_clsname,'Controller')
cl = cont_class(work_dir=u'.',config=config, profile_dir=profile_dir)
engine_clsname = config.IPClusterEngines.get('engine_launcher_class','Local')
engine_class = find_launcher_class(engine_clsname,'EngineSet')
esl = engine_class(work_dir=u'.',config=config, profile_dir=profile_dir)
n = config.IPClusterEngines.get('n', num_cpus())
n = getattr(esl, 'engine_count', n)
return cl, esl, n
def get_profile_dir(self, name, path):
p = ProfileDir.find_profile_dir_by_name(path,name=name)
return p.location
def update_profiles(self):
"""List all profiles in the ipython_dir and cwd.
"""
for path in [get_ipython_dir(), os.getcwdu()]:
for profile in list_profiles_in(path):
pd = self.get_profile_dir(profile, path)
if profile not in self.profiles:
self.log.debug("Overwriting profile %s" % profile)
self.profiles[profile] = {
'profile': profile,
'profile_dir': pd,
'status': 'stopped'
}
def list_profiles(self):
self.update_profiles()
result = [self.profile_info(p) for p in self.profiles.keys()]
return result
def check_profile(self, profile):
if profile not in self.profiles:
raise web.HTTPError(404, u'profile not found')
def profile_info(self, profile):
self.check_profile(profile)
result = {}
data = self.profiles.get(profile)
result['profile'] = profile
result['profile_dir'] = data['profile_dir']
result['status'] = data['status']
if 'n' in data:
result['n'] = data['n']
return result
def start_cluster(self, profile, n=None):
"""Start a cluster for a given profile."""
self.check_profile(profile)
data = self.profiles[profile]
if data['status'] == 'running':
raise web.HTTPError(409, u'cluster already running')
cl, esl, default_n = self.build_launchers(data['profile_dir'])
n = n if n is not None else default_n
def clean_data():
data.pop('controller_launcher',None)
data.pop('engine_set_launcher',None)
data.pop('n',None)
data['status'] = 'stopped'
def engines_stopped(r):
self.log.debug('Engines stopped')
if cl.running:
cl.stop()
clean_data()
esl.on_stop(engines_stopped)
def controller_stopped(r):
self.log.debug('Controller stopped')
if esl.running:
esl.stop()
clean_data()
cl.on_stop(controller_stopped)
dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
dc.start()
dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
dc.start()
self.log.debug('Cluster started')
data['controller_launcher'] = cl
data['engine_set_launcher'] = esl
data['n'] = n
data['status'] = 'running'
return self.profile_info(profile)
def stop_cluster(self, profile):
"""Stop a cluster for a given profile."""
self.check_profile(profile)
data = self.profiles[profile]
if data['status'] == 'stopped':
raise web.HTTPError(409, u'cluster not running')
data = self.profiles[profile]
cl = data['controller_launcher']
esl = data['engine_set_launcher']
if cl.running:
cl.stop()
if esl.running:
esl.stop()
# Return a temp info dict, the real one is updated in the on_stop
# logic above.
result = {
'profile': data['profile'],
'profile_dir': data['profile_dir'],
'status': 'stopped'
}
return result
def stop_all_clusters(self):
for p in self.profiles.keys():
self.stop_cluster(profile)