##// END OF EJS Templates
Merge pull request #6607 from minrk/cluster-load-remove...
Matthias Bussonnier -
r18135:0d959525 merge
parent child Browse files
Show More
@@ -1,174 +1,162 b''
1 """Manage IPython.parallel clusters in the notebook.
1 """Manage IPython.parallel clusters in the notebook."""
2
2
3 Authors:
3 # Copyright (c) IPython Development Team.
4
4 # Distributed under the terms of the Modified BSD License.
5 * Brian Granger
6 """
7
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
14
15 #-----------------------------------------------------------------------------
16 # Imports
17 #-----------------------------------------------------------------------------
18
5
19 from tornado import web
6 from tornado import web
20
7
21 from IPython.config.configurable import LoggingConfigurable
8 from IPython.config.configurable import LoggingConfigurable
22 from IPython.utils.traitlets import Dict, Instance, Float
9 from IPython.utils.traitlets import Dict, Instance, Float
23 from IPython.core.profileapp import list_profiles_in
10 from IPython.core.profileapp import list_profiles_in
24 from IPython.core.profiledir import ProfileDir
11 from IPython.core.profiledir import ProfileDir
25 from IPython.utils import py3compat
12 from IPython.utils import py3compat
26 from IPython.utils.path import get_ipython_dir
13 from IPython.utils.path import get_ipython_dir
27
14
28
15
29 #-----------------------------------------------------------------------------
30 # Classes
31 #-----------------------------------------------------------------------------
32
33
34
35
36 class ClusterManager(LoggingConfigurable):
16 class ClusterManager(LoggingConfigurable):
37
17
38 profiles = Dict()
18 profiles = Dict()
39
19
40 delay = Float(1., config=True,
20 delay = Float(1., config=True,
41 help="delay (in s) between starting the controller and the engines")
21 help="delay (in s) between starting the controller and the engines")
42
22
43 loop = Instance('zmq.eventloop.ioloop.IOLoop')
23 loop = Instance('zmq.eventloop.ioloop.IOLoop')
44 def _loop_default(self):
24 def _loop_default(self):
45 from zmq.eventloop.ioloop import IOLoop
25 from zmq.eventloop.ioloop import IOLoop
46 return IOLoop.instance()
26 return IOLoop.instance()
47
27
48 def build_launchers(self, profile_dir):
28 def build_launchers(self, profile_dir):
49 from IPython.parallel.apps.ipclusterapp import IPClusterStart
29 from IPython.parallel.apps.ipclusterapp import IPClusterStart
50
30
51 class DummyIPClusterStart(IPClusterStart):
31 class DummyIPClusterStart(IPClusterStart):
52 """Dummy subclass to skip init steps that conflict with global app.
32 """Dummy subclass to skip init steps that conflict with global app.
53
33
54 Instantiating and initializing this class should result in fully configured
34 Instantiating and initializing this class should result in fully configured
55 launchers, but no other side effects or state.
35 launchers, but no other side effects or state.
56 """
36 """
57
37
58 def init_signal(self):
38 def init_signal(self):
59 pass
39 pass
60 def reinit_logging(self):
40 def reinit_logging(self):
61 pass
41 pass
62
42
63 starter = DummyIPClusterStart(log=self.log)
43 starter = DummyIPClusterStart(log=self.log)
64 starter.initialize(['--profile-dir', profile_dir])
44 starter.initialize(['--profile-dir', profile_dir])
65 cl = starter.controller_launcher
45 cl = starter.controller_launcher
66 esl = starter.engine_launcher
46 esl = starter.engine_launcher
67 n = starter.n
47 n = starter.n
68 return cl, esl, n
48 return cl, esl, n
69
49
70 def get_profile_dir(self, name, path):
50 def get_profile_dir(self, name, path):
71 p = ProfileDir.find_profile_dir_by_name(path,name=name)
51 p = ProfileDir.find_profile_dir_by_name(path,name=name)
72 return p.location
52 return p.location
73
53
74 def update_profiles(self):
54 def update_profiles(self):
75 """List all profiles in the ipython_dir and cwd.
55 """List all profiles in the ipython_dir and cwd.
76 """
56 """
57
58 stale = set(self.profiles)
77 for path in [get_ipython_dir(), py3compat.getcwd()]:
59 for path in [get_ipython_dir(), py3compat.getcwd()]:
78 for profile in list_profiles_in(path):
60 for profile in list_profiles_in(path):
61 if profile in stale:
62 stale.remove(profile)
79 pd = self.get_profile_dir(profile, path)
63 pd = self.get_profile_dir(profile, path)
80 if profile not in self.profiles:
64 if profile not in self.profiles:
81 self.log.debug("Adding cluster profile '%s'" % profile)
65 self.log.debug("Adding cluster profile '%s'", profile)
82 self.profiles[profile] = {
66 self.profiles[profile] = {
83 'profile': profile,
67 'profile': profile,
84 'profile_dir': pd,
68 'profile_dir': pd,
85 'status': 'stopped'
69 'status': 'stopped'
86 }
70 }
71 for profile in stale:
72 # remove profiles that no longer exist
73 self.log.debug("Profile '%s' no longer exists", profile)
74 self.profiles.pop(stale)
87
75
88 def list_profiles(self):
76 def list_profiles(self):
89 self.update_profiles()
77 self.update_profiles()
90 # sorted list, but ensure that 'default' always comes first
78 # sorted list, but ensure that 'default' always comes first
91 default_first = lambda name: name if name != 'default' else ''
79 default_first = lambda name: name if name != 'default' else ''
92 result = [self.profile_info(p) for p in sorted(self.profiles, key=default_first)]
80 result = [self.profile_info(p) for p in sorted(self.profiles, key=default_first)]
93 return result
81 return result
94
82
95 def check_profile(self, profile):
83 def check_profile(self, profile):
96 if profile not in self.profiles:
84 if profile not in self.profiles:
97 raise web.HTTPError(404, u'profile not found')
85 raise web.HTTPError(404, u'profile not found')
98
86
99 def profile_info(self, profile):
87 def profile_info(self, profile):
100 self.check_profile(profile)
88 self.check_profile(profile)
101 result = {}
89 result = {}
102 data = self.profiles.get(profile)
90 data = self.profiles.get(profile)
103 result['profile'] = profile
91 result['profile'] = profile
104 result['profile_dir'] = data['profile_dir']
92 result['profile_dir'] = data['profile_dir']
105 result['status'] = data['status']
93 result['status'] = data['status']
106 if 'n' in data:
94 if 'n' in data:
107 result['n'] = data['n']
95 result['n'] = data['n']
108 return result
96 return result
109
97
110 def start_cluster(self, profile, n=None):
98 def start_cluster(self, profile, n=None):
111 """Start a cluster for a given profile."""
99 """Start a cluster for a given profile."""
112 self.check_profile(profile)
100 self.check_profile(profile)
113 data = self.profiles[profile]
101 data = self.profiles[profile]
114 if data['status'] == 'running':
102 if data['status'] == 'running':
115 raise web.HTTPError(409, u'cluster already running')
103 raise web.HTTPError(409, u'cluster already running')
116 cl, esl, default_n = self.build_launchers(data['profile_dir'])
104 cl, esl, default_n = self.build_launchers(data['profile_dir'])
117 n = n if n is not None else default_n
105 n = n if n is not None else default_n
118 def clean_data():
106 def clean_data():
119 data.pop('controller_launcher',None)
107 data.pop('controller_launcher',None)
120 data.pop('engine_set_launcher',None)
108 data.pop('engine_set_launcher',None)
121 data.pop('n',None)
109 data.pop('n',None)
122 data['status'] = 'stopped'
110 data['status'] = 'stopped'
123 def engines_stopped(r):
111 def engines_stopped(r):
124 self.log.debug('Engines stopped')
112 self.log.debug('Engines stopped')
125 if cl.running:
113 if cl.running:
126 cl.stop()
114 cl.stop()
127 clean_data()
115 clean_data()
128 esl.on_stop(engines_stopped)
116 esl.on_stop(engines_stopped)
129 def controller_stopped(r):
117 def controller_stopped(r):
130 self.log.debug('Controller stopped')
118 self.log.debug('Controller stopped')
131 if esl.running:
119 if esl.running:
132 esl.stop()
120 esl.stop()
133 clean_data()
121 clean_data()
134 cl.on_stop(controller_stopped)
122 cl.on_stop(controller_stopped)
135 loop = self.loop
123 loop = self.loop
136
124
137 def start():
125 def start():
138 """start the controller, then the engines after a delay"""
126 """start the controller, then the engines after a delay"""
139 cl.start()
127 cl.start()
140 loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n))
128 loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n))
141 self.loop.add_callback(start)
129 self.loop.add_callback(start)
142
130
143 self.log.debug('Cluster started')
131 self.log.debug('Cluster started')
144 data['controller_launcher'] = cl
132 data['controller_launcher'] = cl
145 data['engine_set_launcher'] = esl
133 data['engine_set_launcher'] = esl
146 data['n'] = n
134 data['n'] = n
147 data['status'] = 'running'
135 data['status'] = 'running'
148 return self.profile_info(profile)
136 return self.profile_info(profile)
149
137
150 def stop_cluster(self, profile):
138 def stop_cluster(self, profile):
151 """Stop a cluster for a given profile."""
139 """Stop a cluster for a given profile."""
152 self.check_profile(profile)
140 self.check_profile(profile)
153 data = self.profiles[profile]
141 data = self.profiles[profile]
154 if data['status'] == 'stopped':
142 if data['status'] == 'stopped':
155 raise web.HTTPError(409, u'cluster not running')
143 raise web.HTTPError(409, u'cluster not running')
156 data = self.profiles[profile]
144 data = self.profiles[profile]
157 cl = data['controller_launcher']
145 cl = data['controller_launcher']
158 esl = data['engine_set_launcher']
146 esl = data['engine_set_launcher']
159 if cl.running:
147 if cl.running:
160 cl.stop()
148 cl.stop()
161 if esl.running:
149 if esl.running:
162 esl.stop()
150 esl.stop()
163 # Return a temp info dict, the real one is updated in the on_stop
151 # Return a temp info dict, the real one is updated in the on_stop
164 # logic above.
152 # logic above.
165 result = {
153 result = {
166 'profile': data['profile'],
154 'profile': data['profile'],
167 'profile_dir': data['profile_dir'],
155 'profile_dir': data['profile_dir'],
168 'status': 'stopped'
156 'status': 'stopped'
169 }
157 }
170 return result
158 return result
171
159
172 def stop_all_clusters(self):
160 def stop_all_clusters(self):
173 for p in self.profiles.keys():
161 for p in self.profiles.keys():
174 self.stop_cluster(p)
162 self.stop_cluster(p)
General Comments 0
You need to be logged in to leave comments. Login now