##// END OF EJS Templates
Merge pull request #1508 from minrk/i1507...
Min RK -
r6273:7db18f8a merge
parent child Browse files
Show More
@@ -1,175 +1,174 b''
1 """Manage IPython.parallel clusters in the notebook.
1 """Manage IPython.parallel clusters in the notebook.
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import os
19 import os
20
20
21 from tornado import web
21 from tornado import web
22 from zmq.eventloop import ioloop
22 from zmq.eventloop import ioloop
23
23
24 from IPython.config.configurable import LoggingConfigurable
24 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.loader import load_pyconfig_files
25 from IPython.config.loader import load_pyconfig_files
26 from IPython.utils.traitlets import Dict, Instance, CFloat
26 from IPython.utils.traitlets import Dict, Instance, CFloat
27 from IPython.parallel.apps.ipclusterapp import IPClusterStart
27 from IPython.parallel.apps.ipclusterapp import IPClusterStart
28 from IPython.core.profileapp import list_profiles_in
28 from IPython.core.profileapp import list_profiles_in
29 from IPython.core.profiledir import ProfileDir
29 from IPython.core.profiledir import ProfileDir
30 from IPython.utils.path import get_ipython_dir
30 from IPython.utils.path import get_ipython_dir
31 from IPython.utils.sysinfo import num_cpus
31 from IPython.utils.sysinfo import num_cpus
32
32
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Classes
35 # Classes
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38
38
39 class DummyIPClusterStart(IPClusterStart):
39 class DummyIPClusterStart(IPClusterStart):
40 """Dummy subclass to skip init steps that conflict with global app.
40 """Dummy subclass to skip init steps that conflict with global app.
41
41
42 Instantiating and initializing this class should result in fully configured
42 Instantiating and initializing this class should result in fully configured
43 launchers, but no other side effects or state.
43 launchers, but no other side effects or state.
44 """
44 """
45
45
46 def init_signal(self):
46 def init_signal(self):
47 pass
47 pass
48 def init_logging(self):
48 def init_logging(self):
49 pass
49 pass
50 def reinit_logging(self):
50 def reinit_logging(self):
51 pass
51 pass
52
52
53
53
54 class ClusterManager(LoggingConfigurable):
54 class ClusterManager(LoggingConfigurable):
55
55
56 profiles = Dict()
56 profiles = Dict()
57
57
58 delay = CFloat(1., config=True,
58 delay = CFloat(1., config=True,
59 help="delay (in s) between starting the controller and the engines")
59 help="delay (in s) between starting the controller and the engines")
60
60
61 loop = Instance('zmq.eventloop.ioloop.IOLoop')
61 loop = Instance('zmq.eventloop.ioloop.IOLoop')
62 def _loop_default(self):
62 def _loop_default(self):
63 from zmq.eventloop.ioloop import IOLoop
63 from zmq.eventloop.ioloop import IOLoop
64 return IOLoop.instance()
64 return IOLoop.instance()
65
65
66 def build_launchers(self, profile_dir):
66 def build_launchers(self, profile_dir):
67 starter = DummyIPClusterStart(log=self.log)
67 starter = DummyIPClusterStart(log=self.log)
68 starter.initialize(['--profile-dir', profile_dir])
68 starter.initialize(['--profile-dir', profile_dir])
69 cl = starter.controller_launcher
69 cl = starter.controller_launcher
70 esl = starter.engine_launcher
70 esl = starter.engine_launcher
71 n = starter.n
71 n = starter.n
72 return cl, esl, n
72 return cl, esl, n
73
73
74 def get_profile_dir(self, name, path):
74 def get_profile_dir(self, name, path):
75 p = ProfileDir.find_profile_dir_by_name(path,name=name)
75 p = ProfileDir.find_profile_dir_by_name(path,name=name)
76 return p.location
76 return p.location
77
77
78 def update_profiles(self):
78 def update_profiles(self):
79 """List all profiles in the ipython_dir and cwd.
79 """List all profiles in the ipython_dir and cwd.
80 """
80 """
81 for path in [get_ipython_dir(), os.getcwdu()]:
81 for path in [get_ipython_dir(), os.getcwdu()]:
82 for profile in list_profiles_in(path):
82 for profile in list_profiles_in(path):
83 pd = self.get_profile_dir(profile, path)
83 pd = self.get_profile_dir(profile, path)
84 if profile not in self.profiles:
84 if profile not in self.profiles:
85 self.log.debug("Overwriting profile %s" % profile)
85 self.log.debug("Overwriting profile %s" % profile)
86 self.profiles[profile] = {
86 self.profiles[profile] = {
87 'profile': profile,
87 'profile': profile,
88 'profile_dir': pd,
88 'profile_dir': pd,
89 'status': 'stopped'
89 'status': 'stopped'
90 }
90 }
91
91
92 def list_profiles(self):
92 def list_profiles(self):
93 self.update_profiles()
93 self.update_profiles()
94 result = [self.profile_info(p) for p in self.profiles.keys()]
94 result = [self.profile_info(p) for p in sorted(self.profiles.keys())]
95 result.sort()
96 return result
95 return result
97
96
98 def check_profile(self, profile):
97 def check_profile(self, profile):
99 if profile not in self.profiles:
98 if profile not in self.profiles:
100 raise web.HTTPError(404, u'profile not found')
99 raise web.HTTPError(404, u'profile not found')
101
100
102 def profile_info(self, profile):
101 def profile_info(self, profile):
103 self.check_profile(profile)
102 self.check_profile(profile)
104 result = {}
103 result = {}
105 data = self.profiles.get(profile)
104 data = self.profiles.get(profile)
106 result['profile'] = profile
105 result['profile'] = profile
107 result['profile_dir'] = data['profile_dir']
106 result['profile_dir'] = data['profile_dir']
108 result['status'] = data['status']
107 result['status'] = data['status']
109 if 'n' in data:
108 if 'n' in data:
110 result['n'] = data['n']
109 result['n'] = data['n']
111 return result
110 return result
112
111
113 def start_cluster(self, profile, n=None):
112 def start_cluster(self, profile, n=None):
114 """Start a cluster for a given profile."""
113 """Start a cluster for a given profile."""
115 self.check_profile(profile)
114 self.check_profile(profile)
116 data = self.profiles[profile]
115 data = self.profiles[profile]
117 if data['status'] == 'running':
116 if data['status'] == 'running':
118 raise web.HTTPError(409, u'cluster already running')
117 raise web.HTTPError(409, u'cluster already running')
119 cl, esl, default_n = self.build_launchers(data['profile_dir'])
118 cl, esl, default_n = self.build_launchers(data['profile_dir'])
120 n = n if n is not None else default_n
119 n = n if n is not None else default_n
121 def clean_data():
120 def clean_data():
122 data.pop('controller_launcher',None)
121 data.pop('controller_launcher',None)
123 data.pop('engine_set_launcher',None)
122 data.pop('engine_set_launcher',None)
124 data.pop('n',None)
123 data.pop('n',None)
125 data['status'] = 'stopped'
124 data['status'] = 'stopped'
126 def engines_stopped(r):
125 def engines_stopped(r):
127 self.log.debug('Engines stopped')
126 self.log.debug('Engines stopped')
128 if cl.running:
127 if cl.running:
129 cl.stop()
128 cl.stop()
130 clean_data()
129 clean_data()
131 esl.on_stop(engines_stopped)
130 esl.on_stop(engines_stopped)
132 def controller_stopped(r):
131 def controller_stopped(r):
133 self.log.debug('Controller stopped')
132 self.log.debug('Controller stopped')
134 if esl.running:
133 if esl.running:
135 esl.stop()
134 esl.stop()
136 clean_data()
135 clean_data()
137 cl.on_stop(controller_stopped)
136 cl.on_stop(controller_stopped)
138
137
139 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
138 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
140 dc.start()
139 dc.start()
141 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
140 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
142 dc.start()
141 dc.start()
143
142
144 self.log.debug('Cluster started')
143 self.log.debug('Cluster started')
145 data['controller_launcher'] = cl
144 data['controller_launcher'] = cl
146 data['engine_set_launcher'] = esl
145 data['engine_set_launcher'] = esl
147 data['n'] = n
146 data['n'] = n
148 data['status'] = 'running'
147 data['status'] = 'running'
149 return self.profile_info(profile)
148 return self.profile_info(profile)
150
149
151 def stop_cluster(self, profile):
150 def stop_cluster(self, profile):
152 """Stop a cluster for a given profile."""
151 """Stop a cluster for a given profile."""
153 self.check_profile(profile)
152 self.check_profile(profile)
154 data = self.profiles[profile]
153 data = self.profiles[profile]
155 if data['status'] == 'stopped':
154 if data['status'] == 'stopped':
156 raise web.HTTPError(409, u'cluster not running')
155 raise web.HTTPError(409, u'cluster not running')
157 data = self.profiles[profile]
156 data = self.profiles[profile]
158 cl = data['controller_launcher']
157 cl = data['controller_launcher']
159 esl = data['engine_set_launcher']
158 esl = data['engine_set_launcher']
160 if cl.running:
159 if cl.running:
161 cl.stop()
160 cl.stop()
162 if esl.running:
161 if esl.running:
163 esl.stop()
162 esl.stop()
164 # Return a temp info dict, the real one is updated in the on_stop
163 # Return a temp info dict, the real one is updated in the on_stop
165 # logic above.
164 # logic above.
166 result = {
165 result = {
167 'profile': data['profile'],
166 'profile': data['profile'],
168 'profile_dir': data['profile_dir'],
167 'profile_dir': data['profile_dir'],
169 'status': 'stopped'
168 'status': 'stopped'
170 }
169 }
171 return result
170 return result
172
171
173 def stop_all_clusters(self):
172 def stop_all_clusters(self):
174 for p in self.profiles.keys():
173 for p in self.profiles.keys():
175 self.stop_cluster(profile)
174 self.stop_cluster(profile)
General Comments 0
You need to be logged in to leave comments. Login now