##// END OF EJS Templates
Notebook cluster manager now uses proper launchers.
Brian Granger -
Show More
@@ -676,3 +676,27 b' class KVArgParseConfigLoader(ArgParseConfigLoader):'
676 sub_parser.load_config(self.extra_args)
676 sub_parser.load_config(self.extra_args)
677 self.config._merge(sub_parser.config)
677 self.config._merge(sub_parser.config)
678 self.extra_args = sub_parser.extra_args
678 self.extra_args = sub_parser.extra_args
679
680
681 def load_pyconfig_files(config_files, path):
682 """Load multiple Python config files, merging each of them in turn.
683
684 Parameters
685 ==========
686 config_files : list of str
687 List of config files names to load and merge into the config.
688 path : unicode
689 The full path to the location of the config files.
690 """
691 config = Config()
692 for cf in config_files:
693 loader = PyFileConfigLoader(cf, path=path)
694 try:
695 next_config = loader.load_config()
696 except ConfigFileNotFound:
697 pass
698 except:
699 raise
700 else:
701 config._merge(next_config)
702 return config
@@ -16,19 +16,20 b' Authors:'
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import datetime
20 import os
19 import os
21 import uuid
22 import glob
23
20
24 from tornado import web
21 from tornado import web
25 from zmq.eventloop import ioloop
22 from zmq.eventloop import ioloop
26
23
27 from IPython.config.configurable import LoggingConfigurable
24 from IPython.config.configurable import LoggingConfigurable
28 from IPython.utils.traitlets import Unicode, List, Dict, Bool
25 from IPython.config.loader import load_pyconfig_files
29 from IPython.parallel.apps.launcher import IPClusterLauncher
26 from IPython.utils.traitlets import Dict, Instance, CFloat
30 from IPython.core.profileapp import list_profiles_in, list_bundled_profiles
27 from IPython.parallel.apps.ipclusterapp import find_launcher_class
31 from IPython.utils.path import get_ipython_dir, get_ipython_package_dir
28 from IPython.core.profileapp import list_profiles_in
29 from IPython.core.profiledir import ProfileDir
30 from IPython.utils.path import get_ipython_dir
31 from IPython.utils.sysinfo import num_cpus
32
32
33
33 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
34 # Classes
35 # Classes
@@ -38,55 +39,130 b' class ClusterManager(LoggingConfigurable):'
38
39
39 profiles = Dict()
40 profiles = Dict()
40
41
41
42 delay = CFloat(1., config=True,
42 def list_profile_names(self):
43 help="delay (in s) between starting the controller and the engines")
44
45 loop = Instance('zmq.eventloop.ioloop.IOLoop')
46 def _loop_default(self):
47 from zmq.eventloop.ioloop import IOLoop
48 return IOLoop.instance()
49
50 def load_cluster_config(self, profile_dir):
51 config_files = ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
52 config = load_pyconfig_files(config_files, profile_dir)
53 return config
54
55 def build_launchers(self, profile_dir):
56 config = self.load_cluster_config(profile_dir)
57 cont_clsname = config.IPClusterStart.get('controller_launcher_class','Local')
58 cont_class = find_launcher_class(cont_clsname,'Controller')
59 cl = cont_class(work_dir=u'.',config=config, profile_dir=profile_dir)
60
61 engine_clsname = config.IPClusterEngines.get('engine_launcher_class','Local')
62 engine_class = find_launcher_class(engine_clsname,'EngineSet')
63 esl = engine_class(work_dir=u'.',config=config, profile_dir=profile_dir)
64 n = config.IPClusterEngines.get('n', num_cpus())
65 n = getattr(esl, 'engine_count', n)
66 return cl, esl, n
67
68 def get_profile_dir(self, name, path):
69 p = ProfileDir.find_profile_dir_by_name(path,name=name)
70 return p.location
71
72 def update_profiles(self):
43 """List all profiles in the ipython_dir and cwd.
73 """List all profiles in the ipython_dir and cwd.
44 """
74 """
45 profiles = list_profiles_in(get_ipython_dir())
75 for path in [get_ipython_dir(), os.getcwdu()]:
46 profiles += list_profiles_in(os.getcwdu())
76 for profile in list_profiles_in(path):
47 return profiles
77 pd = self.get_profile_dir(profile, path)
48
78 if profile not in self.profiles:
79 self.log.debug("Overwriting profile %s" % profile)
80 self.profiles[profile] = {
81 'profile': profile,
82 'profile_dir': pd,
83 'status': 'stopped'
84 }
49
85
50 def list_profiles(self):
86 def list_profiles(self):
51 profiles = self.list_profile_names()
87 self.update_profiles()
52 result = [self.profile_info(p) for p in profiles]
88 result = [self.profile_info(p) for p in self.profiles.keys()]
53 return result
89 return result
54
90
91 def check_profile(self, profile):
92 if profile not in self.profiles:
93 raise web.HTTPError(404, u'profile not found')
55
94
56 def profile_info(self, profile):
95 def profile_info(self, profile):
57 if profile not in self.list_profile_names():
96 self.check_profile(profile)
58 raise web.HTTPError(404, u'profile not found')
97 result = {}
59 result = dict(profile=profile)
60 data = self.profiles.get(profile)
98 data = self.profiles.get(profile)
61 if data is None:
99 result['profile'] = profile
62 result['status'] = 'stopped'
100 result['profile_dir'] = data['profile_dir']
63 else:
101 result['status'] = data['status']
64 result['status'] = 'running'
102 if 'n' in data:
65 result['n'] = data['n']
103 result['n'] = data['n']
66 return result
104 return result
67
105
68 def start_cluster(self, profile, n=4):
106 def start_cluster(self, profile, n=None):
69 """Start a cluster for a given profile."""
107 """Start a cluster for a given profile."""
70 if profile not in self.list_profile_names():
108 self.check_profile(profile)
71 raise web.HTTPError(404, u'profile not found')
109 data = self.profiles[profile]
72 if profile in self.profiles:
110 if data['status'] == 'running':
73 raise web.HTTPError(409, u'cluster already running')
111 raise web.HTTPError(409, u'cluster already running')
74 launcher = IPClusterLauncher(ipcluster_profile=profile, ipcluster_n=n)
112 cl, esl, default_n = self.build_launchers(data['profile_dir'])
75 launcher.start()
113 n = n if n is not None else default_n
76 self.profiles[profile] = {
114 def clean_data():
77 'launcher': launcher,
115 data.pop('controller_launcher',None)
78 'n': n
116 data.pop('engine_set_launcher',None)
79 }
117 data.pop('n',None)
118 data['status'] = 'stopped'
119 def engines_stopped(r):
120 self.log.debug('Engines stopped')
121 if cl.running:
122 cl.stop()
123 clean_data()
124 esl.on_stop(engines_stopped)
125 def controller_stopped(r):
126 self.log.debug('Controller stopped')
127 if esl.running:
128 esl.stop()
129 clean_data()
130 cl.on_stop(controller_stopped)
131
132 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
133 dc.start()
134 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
135 dc.start()
136
137 self.log.debug('Cluster started')
138 data['controller_launcher'] = cl
139 data['engine_set_launcher'] = esl
140 data['n'] = n
141 data['status'] = 'running'
80 return self.profile_info(profile)
142 return self.profile_info(profile)
81
143
82 def stop_cluster(self, profile):
144 def stop_cluster(self, profile):
83 """Stop a cluster for a given profile."""
145 """Stop a cluster for a given profile."""
84 if profile not in self.profiles:
146 self.check_profile(profile)
147 data = self.profiles[profile]
148 if data['status'] == 'stopped':
85 raise web.HTTPError(409, u'cluster not running')
149 raise web.HTTPError(409, u'cluster not running')
86 launcher = self.profiles.pop(profile)['launcher']
150 data = self.profiles[profile]
87 launcher.stop()
151 cl = data['controller_launcher']
88 return self.profile_info(profile)
152 esl = data['engine_set_launcher']
153 if cl.running:
154 cl.stop()
155 if esl.running:
156 esl.stop()
157 # Return a temp info dict, the real one is updated in the on_stop
158 # logic above.
159 result = {
160 'profile': data['profile'],
161 'profile_dir': data['profile_dir'],
162 'status': 'stopped'
163 }
164 return result
89
165
90 def stop_all_clusters(self):
166 def stop_all_clusters(self):
91 for p in self.profiles.values():
167 for p in self.profiles.keys():
92 p['launcher'].stop()
168 self.stop_cluster(profile)
@@ -690,8 +690,11 b' class ClusterActionHandler(AuthenticatedHandler):'
690 def post(self, profile, action):
690 def post(self, profile, action):
691 cm = self.application.cluster_manager
691 cm = self.application.cluster_manager
692 if action == 'start':
692 if action == 'start':
693 n = int(self.get_argument('n', default=4))
693 n = self.get_argument('n',default=None)
694 data = cm.start_cluster(profile, n)
694 if n is None:
695 data = cm.start_cluster(profile)
696 else:
697 data = cm.start_cluster(profile,int(n))
695 if action == 'stop':
698 if action == 'stop':
696 data = cm.stop_cluster(profile)
699 data = cm.stop_cluster(profile)
697 self.finish(jsonapi.dumps(data))
700 self.finish(jsonapi.dumps(data))
@@ -406,6 +406,7 b' class NotebookApp(BaseIPythonApplication):'
406 self.notebook_manager = NotebookManager(config=self.config, log=self.log)
406 self.notebook_manager = NotebookManager(config=self.config, log=self.log)
407 self.notebook_manager.list_notebooks()
407 self.notebook_manager.list_notebooks()
408 self.cluster_manager = ClusterManager(config=self.config, log=self.log)
408 self.cluster_manager = ClusterManager(config=self.config, log=self.log)
409 self.cluster_manager.update_profiles()
409
410
410 def init_logging(self):
411 def init_logging(self):
411 super(NotebookApp, self).init_logging()
412 super(NotebookApp, self).init_logging()
@@ -116,11 +116,9 b' var IPython = (function (IPython) {'
116 append(status_col);
116 append(status_col);
117 start_button.click(function (e) {
117 start_button.click(function (e) {
118 var n = that.element.find('.engine_num_input').val();
118 var n = that.element.find('.engine_num_input').val();
119 console.log(n);
119 if (!/^\d+$/.test(n) && n.length>0) {
120 if (!/^\d+$/.test(n)) {
121 status_col.html('invalid engine #');
120 status_col.html('invalid engine #');
122 } else {
121 } else {
123 console.log('ajax...');
124 var settings = {
122 var settings = {
125 cache : false,
123 cache : false,
126 data : {n:n},
124 data : {n:n},
@@ -106,8 +106,35 b' NO_CLUSTER = 12'
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Utilities
110 #-----------------------------------------------------------------------------
111
112 def find_launcher_class(clsname, kind):
113 """Return a launcher for a given clsname and kind.
114
115 Parameters
116 ==========
117 clsname : str
118 The full name of the launcher class, either with or without the
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF,
120 WindowsHPC).
121 kind : str
122 Either 'EngineSet' or 'Controller'.
123 """
124 if '.' not in clsname:
125 # not a module, presume it's the raw name in apps.launcher
126 if kind and kind not in clsname:
127 # doesn't match necessary full class name, assume it's
128 # just 'PBS' or 'MPI' prefix:
129 clsname = clsname + kind + 'Launcher'
130 clsname = 'IPython.parallel.apps.launcher.'+clsname
131 klass = import_item(clsname)
132 return klass
133
134 #-----------------------------------------------------------------------------
109 # Main application
135 # Main application
110 #-----------------------------------------------------------------------------
136 #-----------------------------------------------------------------------------
137
111 start_help = """Start an IPython cluster for parallel computing
138 start_help = """Start an IPython cluster for parallel computing
112
139
113 Start an ipython cluster by its profile name or cluster
140 Start an ipython cluster by its profile name or cluster
@@ -303,15 +330,8 b' class IPClusterEngines(BaseParallelApplication):'
303
330
304 def build_launcher(self, clsname, kind=None):
331 def build_launcher(self, clsname, kind=None):
305 """import and instantiate a Launcher based on importstring"""
332 """import and instantiate a Launcher based on importstring"""
306 if '.' not in clsname:
307 # not a module, presume it's the raw name in apps.launcher
308 if kind and kind not in clsname:
309 # doesn't match necessary full class name, assume it's
310 # just 'PBS' or 'MPI' prefix:
311 clsname = clsname + kind + 'Launcher'
312 clsname = 'IPython.parallel.apps.launcher.'+clsname
313 try:
333 try:
314 klass = import_item(clsname)
334 klass = find_launcher_class(clsname, kind)
315 except (ImportError, KeyError):
335 except (ImportError, KeyError):
316 self.log.fatal("Could not import launcher class: %r"%clsname)
336 self.log.fatal("Could not import launcher class: %r"%clsname)
317 self.exit(1)
337 self.exit(1)
@@ -1167,12 +1167,12 b' class IPClusterLauncher(LocalProcessLauncher):'
1167 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1167 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1168 help="Command line arguments to pass to ipcluster.")
1168 help="Command line arguments to pass to ipcluster.")
1169 ipcluster_subcommand = Unicode('start')
1169 ipcluster_subcommand = Unicode('start')
1170 ipcluster_profile = Unicode('default')
1170 profile = Unicode('default')
1171 ipcluster_n = Integer(2)
1171 n = Integer(2)
1172
1172
1173 def find_args(self):
1173 def find_args(self):
1174 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1174 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1175 ['--n=%i'%self.ipcluster_n, '--profile=%s'%self.ipcluster_profile] + \
1175 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1176 self.ipcluster_args
1176 self.ipcluster_args
1177
1177
1178 def start(self):
1178 def start(self):
General Comments 0
You need to be logged in to leave comments. Login now