##// END OF EJS Templates
Backport PR #8303: fix bug in stale profile clean up for clusters...
Min RK -
Show More
@@ -1,162 +1,162 b''
1 1 """Manage IPython.parallel clusters in the notebook."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from tornado import web
7 7
8 8 from IPython.config.configurable import LoggingConfigurable
9 9 from IPython.utils.traitlets import Dict, Instance, Float
10 10 from IPython.core.profileapp import list_profiles_in
11 11 from IPython.core.profiledir import ProfileDir
12 12 from IPython.utils import py3compat
13 13 from IPython.utils.path import get_ipython_dir
14 14
15 15
16 16 class ClusterManager(LoggingConfigurable):
17 17
18 18 profiles = Dict()
19 19
20 20 delay = Float(1., config=True,
21 21 help="delay (in s) between starting the controller and the engines")
22 22
23 23 loop = Instance('zmq.eventloop.ioloop.IOLoop')
24 24 def _loop_default(self):
25 25 from zmq.eventloop.ioloop import IOLoop
26 26 return IOLoop.instance()
27 27
28 28 def build_launchers(self, profile_dir):
29 29 from IPython.parallel.apps.ipclusterapp import IPClusterStart
30 30
31 31 class DummyIPClusterStart(IPClusterStart):
32 32 """Dummy subclass to skip init steps that conflict with global app.
33 33
34 34 Instantiating and initializing this class should result in fully configured
35 35 launchers, but no other side effects or state.
36 36 """
37 37
38 38 def init_signal(self):
39 39 pass
40 40 def reinit_logging(self):
41 41 pass
42 42
43 43 starter = DummyIPClusterStart(log=self.log)
44 44 starter.initialize(['--profile-dir', profile_dir])
45 45 cl = starter.controller_launcher
46 46 esl = starter.engine_launcher
47 47 n = starter.n
48 48 return cl, esl, n
49 49
50 50 def get_profile_dir(self, name, path):
51 51 p = ProfileDir.find_profile_dir_by_name(path,name=name)
52 52 return p.location
53 53
54 54 def update_profiles(self):
55 55 """List all profiles in the ipython_dir and cwd.
56 56 """
57 57
58 58 stale = set(self.profiles)
59 59 for path in [get_ipython_dir(), py3compat.getcwd()]:
60 60 for profile in list_profiles_in(path):
61 61 if profile in stale:
62 62 stale.remove(profile)
63 63 pd = self.get_profile_dir(profile, path)
64 64 if profile not in self.profiles:
65 65 self.log.debug("Adding cluster profile '%s'", profile)
66 66 self.profiles[profile] = {
67 67 'profile': profile,
68 68 'profile_dir': pd,
69 69 'status': 'stopped'
70 70 }
71 71 for profile in stale:
72 72 # remove profiles that no longer exist
73 73 self.log.debug("Profile '%s' no longer exists", profile)
74 self.profiles.pop(stale)
74 self.profiles.pop(profile)
75 75
76 76 def list_profiles(self):
77 77 self.update_profiles()
78 78 # sorted list, but ensure that 'default' always comes first
79 79 default_first = lambda name: name if name != 'default' else ''
80 80 result = [self.profile_info(p) for p in sorted(self.profiles, key=default_first)]
81 81 return result
82 82
83 83 def check_profile(self, profile):
84 84 if profile not in self.profiles:
85 85 raise web.HTTPError(404, u'profile not found')
86 86
87 87 def profile_info(self, profile):
88 88 self.check_profile(profile)
89 89 result = {}
90 90 data = self.profiles.get(profile)
91 91 result['profile'] = profile
92 92 result['profile_dir'] = data['profile_dir']
93 93 result['status'] = data['status']
94 94 if 'n' in data:
95 95 result['n'] = data['n']
96 96 return result
97 97
98 98 def start_cluster(self, profile, n=None):
99 99 """Start a cluster for a given profile."""
100 100 self.check_profile(profile)
101 101 data = self.profiles[profile]
102 102 if data['status'] == 'running':
103 103 raise web.HTTPError(409, u'cluster already running')
104 104 cl, esl, default_n = self.build_launchers(data['profile_dir'])
105 105 n = n if n is not None else default_n
106 106 def clean_data():
107 107 data.pop('controller_launcher',None)
108 108 data.pop('engine_set_launcher',None)
109 109 data.pop('n',None)
110 110 data['status'] = 'stopped'
111 111 def engines_stopped(r):
112 112 self.log.debug('Engines stopped')
113 113 if cl.running:
114 114 cl.stop()
115 115 clean_data()
116 116 esl.on_stop(engines_stopped)
117 117 def controller_stopped(r):
118 118 self.log.debug('Controller stopped')
119 119 if esl.running:
120 120 esl.stop()
121 121 clean_data()
122 122 cl.on_stop(controller_stopped)
123 123 loop = self.loop
124 124
125 125 def start():
126 126 """start the controller, then the engines after a delay"""
127 127 cl.start()
128 128 loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n))
129 129 self.loop.add_callback(start)
130 130
131 131 self.log.debug('Cluster started')
132 132 data['controller_launcher'] = cl
133 133 data['engine_set_launcher'] = esl
134 134 data['n'] = n
135 135 data['status'] = 'running'
136 136 return self.profile_info(profile)
137 137
138 138 def stop_cluster(self, profile):
139 139 """Stop a cluster for a given profile."""
140 140 self.check_profile(profile)
141 141 data = self.profiles[profile]
142 142 if data['status'] == 'stopped':
143 143 raise web.HTTPError(409, u'cluster not running')
144 144 data = self.profiles[profile]
145 145 cl = data['controller_launcher']
146 146 esl = data['engine_set_launcher']
147 147 if cl.running:
148 148 cl.stop()
149 149 if esl.running:
150 150 esl.stop()
151 151 # Return a temp info dict, the real one is updated in the on_stop
152 152 # logic above.
153 153 result = {
154 154 'profile': data['profile'],
155 155 'profile_dir': data['profile_dir'],
156 156 'status': 'stopped'
157 157 }
158 158 return result
159 159
160 160 def stop_all_clusters(self):
161 161 for p in self.profiles.keys():
162 162 self.stop_cluster(p)
General Comments 0
You need to be logged in to leave comments. Login now