##// END OF EJS Templates
Change distressing / inaccurate debug message about overwriting profiles
MinRK -
Show More
@@ -1,172 +1,172 b''
1 """Manage IPython.parallel clusters in the notebook.
1 """Manage IPython.parallel clusters in the notebook.
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import os
19 import os
20
20
21 from tornado import web
21 from tornado import web
22 from zmq.eventloop import ioloop
22 from zmq.eventloop import ioloop
23
23
24 from IPython.config.configurable import LoggingConfigurable
24 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.loader import load_pyconfig_files
25 from IPython.config.loader import load_pyconfig_files
26 from IPython.utils.traitlets import Dict, Instance, CFloat
26 from IPython.utils.traitlets import Dict, Instance, CFloat
27 from IPython.parallel.apps.ipclusterapp import IPClusterStart
27 from IPython.parallel.apps.ipclusterapp import IPClusterStart
28 from IPython.core.profileapp import list_profiles_in
28 from IPython.core.profileapp import list_profiles_in
29 from IPython.core.profiledir import ProfileDir
29 from IPython.core.profiledir import ProfileDir
30 from IPython.utils.path import get_ipython_dir
30 from IPython.utils.path import get_ipython_dir
31 from IPython.utils.sysinfo import num_cpus
31 from IPython.utils.sysinfo import num_cpus
32
32
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Classes
35 # Classes
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38
38
39 class DummyIPClusterStart(IPClusterStart):
39 class DummyIPClusterStart(IPClusterStart):
40 """Dummy subclass to skip init steps that conflict with global app.
40 """Dummy subclass to skip init steps that conflict with global app.
41
41
42 Instantiating and initializing this class should result in fully configured
42 Instantiating and initializing this class should result in fully configured
43 launchers, but no other side effects or state.
43 launchers, but no other side effects or state.
44 """
44 """
45
45
46 def init_signal(self):
46 def init_signal(self):
47 pass
47 pass
48 def reinit_logging(self):
48 def reinit_logging(self):
49 pass
49 pass
50
50
51
51
52 class ClusterManager(LoggingConfigurable):
52 class ClusterManager(LoggingConfigurable):
53
53
54 profiles = Dict()
54 profiles = Dict()
55
55
56 delay = CFloat(1., config=True,
56 delay = CFloat(1., config=True,
57 help="delay (in s) between starting the controller and the engines")
57 help="delay (in s) between starting the controller and the engines")
58
58
59 loop = Instance('zmq.eventloop.ioloop.IOLoop')
59 loop = Instance('zmq.eventloop.ioloop.IOLoop')
60 def _loop_default(self):
60 def _loop_default(self):
61 from zmq.eventloop.ioloop import IOLoop
61 from zmq.eventloop.ioloop import IOLoop
62 return IOLoop.instance()
62 return IOLoop.instance()
63
63
64 def build_launchers(self, profile_dir):
64 def build_launchers(self, profile_dir):
65 starter = DummyIPClusterStart(log=self.log)
65 starter = DummyIPClusterStart(log=self.log)
66 starter.initialize(['--profile-dir', profile_dir])
66 starter.initialize(['--profile-dir', profile_dir])
67 cl = starter.controller_launcher
67 cl = starter.controller_launcher
68 esl = starter.engine_launcher
68 esl = starter.engine_launcher
69 n = starter.n
69 n = starter.n
70 return cl, esl, n
70 return cl, esl, n
71
71
72 def get_profile_dir(self, name, path):
72 def get_profile_dir(self, name, path):
73 p = ProfileDir.find_profile_dir_by_name(path,name=name)
73 p = ProfileDir.find_profile_dir_by_name(path,name=name)
74 return p.location
74 return p.location
75
75
76 def update_profiles(self):
76 def update_profiles(self):
77 """List all profiles in the ipython_dir and cwd.
77 """List all profiles in the ipython_dir and cwd.
78 """
78 """
79 for path in [get_ipython_dir(), os.getcwdu()]:
79 for path in [get_ipython_dir(), os.getcwdu()]:
80 for profile in list_profiles_in(path):
80 for profile in list_profiles_in(path):
81 pd = self.get_profile_dir(profile, path)
81 pd = self.get_profile_dir(profile, path)
82 if profile not in self.profiles:
82 if profile not in self.profiles:
83 self.log.debug("Overwriting profile %s" % profile)
83 self.log.debug("Adding cluster profile '%s'" % profile)
84 self.profiles[profile] = {
84 self.profiles[profile] = {
85 'profile': profile,
85 'profile': profile,
86 'profile_dir': pd,
86 'profile_dir': pd,
87 'status': 'stopped'
87 'status': 'stopped'
88 }
88 }
89
89
90 def list_profiles(self):
90 def list_profiles(self):
91 self.update_profiles()
91 self.update_profiles()
92 result = [self.profile_info(p) for p in sorted(self.profiles.keys())]
92 result = [self.profile_info(p) for p in sorted(self.profiles.keys())]
93 return result
93 return result
94
94
95 def check_profile(self, profile):
95 def check_profile(self, profile):
96 if profile not in self.profiles:
96 if profile not in self.profiles:
97 raise web.HTTPError(404, u'profile not found')
97 raise web.HTTPError(404, u'profile not found')
98
98
99 def profile_info(self, profile):
99 def profile_info(self, profile):
100 self.check_profile(profile)
100 self.check_profile(profile)
101 result = {}
101 result = {}
102 data = self.profiles.get(profile)
102 data = self.profiles.get(profile)
103 result['profile'] = profile
103 result['profile'] = profile
104 result['profile_dir'] = data['profile_dir']
104 result['profile_dir'] = data['profile_dir']
105 result['status'] = data['status']
105 result['status'] = data['status']
106 if 'n' in data:
106 if 'n' in data:
107 result['n'] = data['n']
107 result['n'] = data['n']
108 return result
108 return result
109
109
110 def start_cluster(self, profile, n=None):
110 def start_cluster(self, profile, n=None):
111 """Start a cluster for a given profile."""
111 """Start a cluster for a given profile."""
112 self.check_profile(profile)
112 self.check_profile(profile)
113 data = self.profiles[profile]
113 data = self.profiles[profile]
114 if data['status'] == 'running':
114 if data['status'] == 'running':
115 raise web.HTTPError(409, u'cluster already running')
115 raise web.HTTPError(409, u'cluster already running')
116 cl, esl, default_n = self.build_launchers(data['profile_dir'])
116 cl, esl, default_n = self.build_launchers(data['profile_dir'])
117 n = n if n is not None else default_n
117 n = n if n is not None else default_n
118 def clean_data():
118 def clean_data():
119 data.pop('controller_launcher',None)
119 data.pop('controller_launcher',None)
120 data.pop('engine_set_launcher',None)
120 data.pop('engine_set_launcher',None)
121 data.pop('n',None)
121 data.pop('n',None)
122 data['status'] = 'stopped'
122 data['status'] = 'stopped'
123 def engines_stopped(r):
123 def engines_stopped(r):
124 self.log.debug('Engines stopped')
124 self.log.debug('Engines stopped')
125 if cl.running:
125 if cl.running:
126 cl.stop()
126 cl.stop()
127 clean_data()
127 clean_data()
128 esl.on_stop(engines_stopped)
128 esl.on_stop(engines_stopped)
129 def controller_stopped(r):
129 def controller_stopped(r):
130 self.log.debug('Controller stopped')
130 self.log.debug('Controller stopped')
131 if esl.running:
131 if esl.running:
132 esl.stop()
132 esl.stop()
133 clean_data()
133 clean_data()
134 cl.on_stop(controller_stopped)
134 cl.on_stop(controller_stopped)
135
135
136 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
136 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
137 dc.start()
137 dc.start()
138 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
138 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
139 dc.start()
139 dc.start()
140
140
141 self.log.debug('Cluster started')
141 self.log.debug('Cluster started')
142 data['controller_launcher'] = cl
142 data['controller_launcher'] = cl
143 data['engine_set_launcher'] = esl
143 data['engine_set_launcher'] = esl
144 data['n'] = n
144 data['n'] = n
145 data['status'] = 'running'
145 data['status'] = 'running'
146 return self.profile_info(profile)
146 return self.profile_info(profile)
147
147
148 def stop_cluster(self, profile):
148 def stop_cluster(self, profile):
149 """Stop a cluster for a given profile."""
149 """Stop a cluster for a given profile."""
150 self.check_profile(profile)
150 self.check_profile(profile)
151 data = self.profiles[profile]
151 data = self.profiles[profile]
152 if data['status'] == 'stopped':
152 if data['status'] == 'stopped':
153 raise web.HTTPError(409, u'cluster not running')
153 raise web.HTTPError(409, u'cluster not running')
154 data = self.profiles[profile]
154 data = self.profiles[profile]
155 cl = data['controller_launcher']
155 cl = data['controller_launcher']
156 esl = data['engine_set_launcher']
156 esl = data['engine_set_launcher']
157 if cl.running:
157 if cl.running:
158 cl.stop()
158 cl.stop()
159 if esl.running:
159 if esl.running:
160 esl.stop()
160 esl.stop()
161 # Return a temp info dict, the real one is updated in the on_stop
161 # Return a temp info dict, the real one is updated in the on_stop
162 # logic above.
162 # logic above.
163 result = {
163 result = {
164 'profile': data['profile'],
164 'profile': data['profile'],
165 'profile_dir': data['profile_dir'],
165 'profile_dir': data['profile_dir'],
166 'status': 'stopped'
166 'status': 'stopped'
167 }
167 }
168 return result
168 return result
169
169
170 def stop_all_clusters(self):
170 def stop_all_clusters(self):
171 for p in self.profiles.keys():
171 for p in self.profiles.keys():
172 self.stop_cluster(p)
172 self.stop_cluster(p)
General Comments 0
You need to be logged in to leave comments. Login now