##// END OF EJS Templates
Merge pull request #6607 from minrk/cluster-load-remove...
Matthias Bussonnier -
r18135:0d959525 merge
parent child Browse files
Show More
@@ -1,174 +1,162
1 """Manage IPython.parallel clusters in the notebook.
1 """Manage IPython.parallel clusters in the notebook."""
2 2
3 Authors:
4
5 * Brian Granger
6 """
7
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
14
15 #-----------------------------------------------------------------------------
16 # Imports
17 #-----------------------------------------------------------------------------
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
18 5
19 6 from tornado import web
20 7
21 8 from IPython.config.configurable import LoggingConfigurable
22 9 from IPython.utils.traitlets import Dict, Instance, Float
23 10 from IPython.core.profileapp import list_profiles_in
24 11 from IPython.core.profiledir import ProfileDir
25 12 from IPython.utils import py3compat
26 13 from IPython.utils.path import get_ipython_dir
27 14
28 15
29 #-----------------------------------------------------------------------------
30 # Classes
31 #-----------------------------------------------------------------------------
32
33
34
35
36 16 class ClusterManager(LoggingConfigurable):
37 17
38 18 profiles = Dict()
39 19
40 20 delay = Float(1., config=True,
41 21 help="delay (in s) between starting the controller and the engines")
42 22
43 23 loop = Instance('zmq.eventloop.ioloop.IOLoop')
44 24 def _loop_default(self):
45 25 from zmq.eventloop.ioloop import IOLoop
46 26 return IOLoop.instance()
47 27
48 28 def build_launchers(self, profile_dir):
49 29 from IPython.parallel.apps.ipclusterapp import IPClusterStart
50 30
51 31 class DummyIPClusterStart(IPClusterStart):
52 32 """Dummy subclass to skip init steps that conflict with global app.
53 33
54 34 Instantiating and initializing this class should result in fully configured
55 35 launchers, but no other side effects or state.
56 36 """
57 37
58 38 def init_signal(self):
59 39 pass
60 40 def reinit_logging(self):
61 41 pass
62 42
63 43 starter = DummyIPClusterStart(log=self.log)
64 44 starter.initialize(['--profile-dir', profile_dir])
65 45 cl = starter.controller_launcher
66 46 esl = starter.engine_launcher
67 47 n = starter.n
68 48 return cl, esl, n
69 49
70 50 def get_profile_dir(self, name, path):
71 51 p = ProfileDir.find_profile_dir_by_name(path,name=name)
72 52 return p.location
73 53
74 54 def update_profiles(self):
75 55 """List all profiles in the ipython_dir and cwd.
76 56 """
57
58 stale = set(self.profiles)
77 59 for path in [get_ipython_dir(), py3compat.getcwd()]:
78 60 for profile in list_profiles_in(path):
61 if profile in stale:
62 stale.remove(profile)
79 63 pd = self.get_profile_dir(profile, path)
80 64 if profile not in self.profiles:
81 self.log.debug("Adding cluster profile '%s'" % profile)
65 self.log.debug("Adding cluster profile '%s'", profile)
82 66 self.profiles[profile] = {
83 67 'profile': profile,
84 68 'profile_dir': pd,
85 69 'status': 'stopped'
86 70 }
71 for profile in stale:
72 # remove profiles that no longer exist
73 self.log.debug("Profile '%s' no longer exists", profile)
74 self.profiles.pop(stale)
87 75
88 76 def list_profiles(self):
89 77 self.update_profiles()
90 78 # sorted list, but ensure that 'default' always comes first
91 79 default_first = lambda name: name if name != 'default' else ''
92 80 result = [self.profile_info(p) for p in sorted(self.profiles, key=default_first)]
93 81 return result
94 82
95 83 def check_profile(self, profile):
96 84 if profile not in self.profiles:
97 85 raise web.HTTPError(404, u'profile not found')
98 86
99 87 def profile_info(self, profile):
100 88 self.check_profile(profile)
101 89 result = {}
102 90 data = self.profiles.get(profile)
103 91 result['profile'] = profile
104 92 result['profile_dir'] = data['profile_dir']
105 93 result['status'] = data['status']
106 94 if 'n' in data:
107 95 result['n'] = data['n']
108 96 return result
109 97
110 98 def start_cluster(self, profile, n=None):
111 99 """Start a cluster for a given profile."""
112 100 self.check_profile(profile)
113 101 data = self.profiles[profile]
114 102 if data['status'] == 'running':
115 103 raise web.HTTPError(409, u'cluster already running')
116 104 cl, esl, default_n = self.build_launchers(data['profile_dir'])
117 105 n = n if n is not None else default_n
118 106 def clean_data():
119 107 data.pop('controller_launcher',None)
120 108 data.pop('engine_set_launcher',None)
121 109 data.pop('n',None)
122 110 data['status'] = 'stopped'
123 111 def engines_stopped(r):
124 112 self.log.debug('Engines stopped')
125 113 if cl.running:
126 114 cl.stop()
127 115 clean_data()
128 116 esl.on_stop(engines_stopped)
129 117 def controller_stopped(r):
130 118 self.log.debug('Controller stopped')
131 119 if esl.running:
132 120 esl.stop()
133 121 clean_data()
134 122 cl.on_stop(controller_stopped)
135 123 loop = self.loop
136 124
137 125 def start():
138 126 """start the controller, then the engines after a delay"""
139 127 cl.start()
140 128 loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n))
141 129 self.loop.add_callback(start)
142 130
143 131 self.log.debug('Cluster started')
144 132 data['controller_launcher'] = cl
145 133 data['engine_set_launcher'] = esl
146 134 data['n'] = n
147 135 data['status'] = 'running'
148 136 return self.profile_info(profile)
149 137
150 138 def stop_cluster(self, profile):
151 139 """Stop a cluster for a given profile."""
152 140 self.check_profile(profile)
153 141 data = self.profiles[profile]
154 142 if data['status'] == 'stopped':
155 143 raise web.HTTPError(409, u'cluster not running')
156 144 data = self.profiles[profile]
157 145 cl = data['controller_launcher']
158 146 esl = data['engine_set_launcher']
159 147 if cl.running:
160 148 cl.stop()
161 149 if esl.running:
162 150 esl.stop()
163 151 # Return a temp info dict, the real one is updated in the on_stop
164 152 # logic above.
165 153 result = {
166 154 'profile': data['profile'],
167 155 'profile_dir': data['profile_dir'],
168 156 'status': 'stopped'
169 157 }
170 158 return result
171 159
172 160 def stop_all_clusters(self):
173 161 for p in self.profiles.keys():
174 162 self.stop_cluster(p)
General Comments 0
You need to be logged in to leave comments. Login now