##// END OF EJS Templates
ensure 'default' is first in cluster profile list...
MinRK -
Show More
@@ -1,172 +1,174 b''
1 """Manage IPython.parallel clusters in the notebook.
1 """Manage IPython.parallel clusters in the notebook.
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import os
19 import os
20
20
21 from tornado import web
21 from tornado import web
22 from zmq.eventloop import ioloop
22 from zmq.eventloop import ioloop
23
23
24 from IPython.config.configurable import LoggingConfigurable
24 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.loader import load_pyconfig_files
25 from IPython.config.loader import load_pyconfig_files
26 from IPython.utils.traitlets import Dict, Instance, CFloat
26 from IPython.utils.traitlets import Dict, Instance, CFloat
27 from IPython.parallel.apps.ipclusterapp import IPClusterStart
27 from IPython.parallel.apps.ipclusterapp import IPClusterStart
28 from IPython.core.profileapp import list_profiles_in
28 from IPython.core.profileapp import list_profiles_in
29 from IPython.core.profiledir import ProfileDir
29 from IPython.core.profiledir import ProfileDir
30 from IPython.utils.path import get_ipython_dir
30 from IPython.utils.path import get_ipython_dir
31 from IPython.utils.sysinfo import num_cpus
31 from IPython.utils.sysinfo import num_cpus
32
32
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Classes
35 # Classes
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38
38
39 class DummyIPClusterStart(IPClusterStart):
39 class DummyIPClusterStart(IPClusterStart):
40 """Dummy subclass to skip init steps that conflict with global app.
40 """Dummy subclass to skip init steps that conflict with global app.
41
41
42 Instantiating and initializing this class should result in fully configured
42 Instantiating and initializing this class should result in fully configured
43 launchers, but no other side effects or state.
43 launchers, but no other side effects or state.
44 """
44 """
45
45
46 def init_signal(self):
46 def init_signal(self):
47 pass
47 pass
48 def reinit_logging(self):
48 def reinit_logging(self):
49 pass
49 pass
50
50
51
51
52 class ClusterManager(LoggingConfigurable):
52 class ClusterManager(LoggingConfigurable):
53
53
54 profiles = Dict()
54 profiles = Dict()
55
55
56 delay = CFloat(1., config=True,
56 delay = CFloat(1., config=True,
57 help="delay (in s) between starting the controller and the engines")
57 help="delay (in s) between starting the controller and the engines")
58
58
59 loop = Instance('zmq.eventloop.ioloop.IOLoop')
59 loop = Instance('zmq.eventloop.ioloop.IOLoop')
60 def _loop_default(self):
60 def _loop_default(self):
61 from zmq.eventloop.ioloop import IOLoop
61 from zmq.eventloop.ioloop import IOLoop
62 return IOLoop.instance()
62 return IOLoop.instance()
63
63
64 def build_launchers(self, profile_dir):
64 def build_launchers(self, profile_dir):
65 starter = DummyIPClusterStart(log=self.log)
65 starter = DummyIPClusterStart(log=self.log)
66 starter.initialize(['--profile-dir', profile_dir])
66 starter.initialize(['--profile-dir', profile_dir])
67 cl = starter.controller_launcher
67 cl = starter.controller_launcher
68 esl = starter.engine_launcher
68 esl = starter.engine_launcher
69 n = starter.n
69 n = starter.n
70 return cl, esl, n
70 return cl, esl, n
71
71
72 def get_profile_dir(self, name, path):
72 def get_profile_dir(self, name, path):
73 p = ProfileDir.find_profile_dir_by_name(path,name=name)
73 p = ProfileDir.find_profile_dir_by_name(path,name=name)
74 return p.location
74 return p.location
75
75
76 def update_profiles(self):
76 def update_profiles(self):
77 """List all profiles in the ipython_dir and cwd.
77 """List all profiles in the ipython_dir and cwd.
78 """
78 """
79 for path in [get_ipython_dir(), os.getcwdu()]:
79 for path in [get_ipython_dir(), os.getcwdu()]:
80 for profile in list_profiles_in(path):
80 for profile in list_profiles_in(path):
81 pd = self.get_profile_dir(profile, path)
81 pd = self.get_profile_dir(profile, path)
82 if profile not in self.profiles:
82 if profile not in self.profiles:
83 self.log.debug("Adding cluster profile '%s'" % profile)
83 self.log.debug("Adding cluster profile '%s'" % profile)
84 self.profiles[profile] = {
84 self.profiles[profile] = {
85 'profile': profile,
85 'profile': profile,
86 'profile_dir': pd,
86 'profile_dir': pd,
87 'status': 'stopped'
87 'status': 'stopped'
88 }
88 }
89
89
90 def list_profiles(self):
90 def list_profiles(self):
91 self.update_profiles()
91 self.update_profiles()
92 result = [self.profile_info(p) for p in sorted(self.profiles.keys())]
92 # sorted list, but ensure that 'default' always comes first
93 default_first = lambda name: name if name != 'default' else ''
94 result = [self.profile_info(p) for p in sorted(self.profiles, key=default_first)]
93 return result
95 return result
94
96
95 def check_profile(self, profile):
97 def check_profile(self, profile):
96 if profile not in self.profiles:
98 if profile not in self.profiles:
97 raise web.HTTPError(404, u'profile not found')
99 raise web.HTTPError(404, u'profile not found')
98
100
99 def profile_info(self, profile):
101 def profile_info(self, profile):
100 self.check_profile(profile)
102 self.check_profile(profile)
101 result = {}
103 result = {}
102 data = self.profiles.get(profile)
104 data = self.profiles.get(profile)
103 result['profile'] = profile
105 result['profile'] = profile
104 result['profile_dir'] = data['profile_dir']
106 result['profile_dir'] = data['profile_dir']
105 result['status'] = data['status']
107 result['status'] = data['status']
106 if 'n' in data:
108 if 'n' in data:
107 result['n'] = data['n']
109 result['n'] = data['n']
108 return result
110 return result
109
111
110 def start_cluster(self, profile, n=None):
112 def start_cluster(self, profile, n=None):
111 """Start a cluster for a given profile."""
113 """Start a cluster for a given profile."""
112 self.check_profile(profile)
114 self.check_profile(profile)
113 data = self.profiles[profile]
115 data = self.profiles[profile]
114 if data['status'] == 'running':
116 if data['status'] == 'running':
115 raise web.HTTPError(409, u'cluster already running')
117 raise web.HTTPError(409, u'cluster already running')
116 cl, esl, default_n = self.build_launchers(data['profile_dir'])
118 cl, esl, default_n = self.build_launchers(data['profile_dir'])
117 n = n if n is not None else default_n
119 n = n if n is not None else default_n
118 def clean_data():
120 def clean_data():
119 data.pop('controller_launcher',None)
121 data.pop('controller_launcher',None)
120 data.pop('engine_set_launcher',None)
122 data.pop('engine_set_launcher',None)
121 data.pop('n',None)
123 data.pop('n',None)
122 data['status'] = 'stopped'
124 data['status'] = 'stopped'
123 def engines_stopped(r):
125 def engines_stopped(r):
124 self.log.debug('Engines stopped')
126 self.log.debug('Engines stopped')
125 if cl.running:
127 if cl.running:
126 cl.stop()
128 cl.stop()
127 clean_data()
129 clean_data()
128 esl.on_stop(engines_stopped)
130 esl.on_stop(engines_stopped)
129 def controller_stopped(r):
131 def controller_stopped(r):
130 self.log.debug('Controller stopped')
132 self.log.debug('Controller stopped')
131 if esl.running:
133 if esl.running:
132 esl.stop()
134 esl.stop()
133 clean_data()
135 clean_data()
134 cl.on_stop(controller_stopped)
136 cl.on_stop(controller_stopped)
135
137
136 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
138 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
137 dc.start()
139 dc.start()
138 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
140 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
139 dc.start()
141 dc.start()
140
142
141 self.log.debug('Cluster started')
143 self.log.debug('Cluster started')
142 data['controller_launcher'] = cl
144 data['controller_launcher'] = cl
143 data['engine_set_launcher'] = esl
145 data['engine_set_launcher'] = esl
144 data['n'] = n
146 data['n'] = n
145 data['status'] = 'running'
147 data['status'] = 'running'
146 return self.profile_info(profile)
148 return self.profile_info(profile)
147
149
148 def stop_cluster(self, profile):
150 def stop_cluster(self, profile):
149 """Stop a cluster for a given profile."""
151 """Stop a cluster for a given profile."""
150 self.check_profile(profile)
152 self.check_profile(profile)
151 data = self.profiles[profile]
153 data = self.profiles[profile]
152 if data['status'] == 'stopped':
154 if data['status'] == 'stopped':
153 raise web.HTTPError(409, u'cluster not running')
155 raise web.HTTPError(409, u'cluster not running')
154 data = self.profiles[profile]
156 data = self.profiles[profile]
155 cl = data['controller_launcher']
157 cl = data['controller_launcher']
156 esl = data['engine_set_launcher']
158 esl = data['engine_set_launcher']
157 if cl.running:
159 if cl.running:
158 cl.stop()
160 cl.stop()
159 if esl.running:
161 if esl.running:
160 esl.stop()
162 esl.stop()
161 # Return a temp info dict, the real one is updated in the on_stop
163 # Return a temp info dict, the real one is updated in the on_stop
162 # logic above.
164 # logic above.
163 result = {
165 result = {
164 'profile': data['profile'],
166 'profile': data['profile'],
165 'profile_dir': data['profile_dir'],
167 'profile_dir': data['profile_dir'],
166 'status': 'stopped'
168 'status': 'stopped'
167 }
169 }
168 return result
170 return result
169
171
170 def stop_all_clusters(self):
172 def stop_all_clusters(self):
171 for p in self.profiles.keys():
173 for p in self.profiles.keys():
172 self.stop_cluster(p)
174 self.stop_cluster(p)
General Comments 0
You need to be logged in to leave comments. Login now