##// END OF EJS Templates
More work on autorestarting.
Brian Granger -
Show More
@@ -1,62 +1,77 b''
1 1 """A basic in process kernel monitor with autorestarting.
2 2
3 3 This watches a kernel's state using KernelManager.is_alive and auto
4 4 restarts the kernel if it dies.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2013 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import zmq
19 19 from zmq.eventloop import ioloop
20 20
21 21
22 22 from IPython.config.configurable import LoggingConfigurable
23 23 from IPython.utils.traitlets import (
24 24 Instance, Float
25 25 )
26 26
27 27 #-----------------------------------------------------------------------------
28 28 # Code
29 29 #-----------------------------------------------------------------------------
30 30
31 31 class KernelRestarter(LoggingConfigurable):
32 32 """Monitor and autorestart a kernel."""
33 33
34 34 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
35 35 def _loop_default(self):
36 36 return ioloop.IOLoop.instance()
37 37
38 38 kernel_manager = Instance('IPython.kernel.kernelmanager.KernelManager')
39 39
40 40 time_to_dead = Float(3.0, config=True,
41 41 help="""Kernel heartbeat interval in seconds."""
42 42 )
43 43
44 _pcallback = None
45
44 46 def __init__(self, **kwargs):
45 47 super(KernelRestarter, self).__init__(**kwargs)
46 48
47 49 def start(self):
48 self.pc = ioloop.PeriodicCallback(self.poll, self.time_to_dead, self.ioloop)
49 self.pc.start()
50 """Start the polling of the kernel."""
51 if self._pcallback is None:
52 self._pcallback = ioloop.PeriodicCallback(
53 self._poll, 1000*self.time_to_dead, self.ioloop
54 )
55 self._pcallback.start()
56
57 def stop(self):
58 """Stop the kernel polling."""
59 if self._pcallback is not None:
60 self._pcallback.stop()
50 61
51 def poll(self):
62 def clear(self):
63 """Clear the underlying PeriodicCallback."""
64 self.stop()
65 if self._pcallback is not None:
66 self._pcallback = None
67
68 def _poll(self):
52 69 if not self.kernel_manager.is_alive():
53 70 self.stop()
54 71 # This restart event should leave the connection file in place so
55 72 # the ports are the same. Because this takes place below the
56 73 # MappingKernelManager, the kernel_id will also remain the same.
74 self.log('KernelRestarter: restarting kernel')
57 75 self.kernel_manager.restart_kernel(now=True);
58 76 self.start()
59 77
60 def stop(self):
61 self.pc.stop()
62 self.pc = None
@@ -1,300 +1,314 b''
1 1 """A kernel manager for multiple kernels
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2013 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from __future__ import absolute_import
20 20
21 21 import os
22 22 import uuid
23 23
24 24 import zmq
25 25 from zmq.eventloop.zmqstream import ZMQStream
26 from zmq.eventloop import ioloop
26 27
27 28 from IPython.config.configurable import LoggingConfigurable
28 29 from IPython.utils.importstring import import_item
29 30 from IPython.utils.traitlets import (
30 Instance, Dict, Unicode, Any, DottedObjectName,
31 Instance, Dict, Unicode, Any, DottedObjectName, Bool
31 32 )
33 # from IPython.kernel.kernelrestarter import KernelRestarter
34
32 35 #-----------------------------------------------------------------------------
33 36 # Classes
34 37 #-----------------------------------------------------------------------------
35 38
36 39 class DuplicateKernelError(Exception):
37 40 pass
38 41
39 42
40 43 class MultiKernelManager(LoggingConfigurable):
41 44 """A class for managing multiple kernels."""
42 45
43 46 kernel_manager_class = DottedObjectName(
44 47 "IPython.kernel.blockingkernelmanager.BlockingKernelManager", config=True,
45 48 help="""The kernel manager class. This is configurable to allow
46 49 subclassing of the KernelManager for customized behavior.
47 50 """
48 51 )
49 52 def _kernel_manager_class_changed(self, name, old, new):
50 53 self.kernel_manager_factory = import_item(new)
51 54
52 55 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
53 56 def _kernel_manager_factory_default(self):
54 57 return import_item(self.kernel_manager_class)
55 58
56 59 context = Instance('zmq.Context')
57 60 def _context_default(self):
58 61 return zmq.Context.instance()
59 62
60 63 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
61 64 def _loop_default(self):
62 65 return ioloop.IOLoop.instance()
63 66
64 autorestart = Bool(True, config=True,
67 autorestart = Bool(False, config=True,
65 68 help="""Should we autorestart kernels that die."""
66 69 )
67 70
68 71 connection_dir = Unicode('')
69 72
70 73 _kernels = Dict()
71 74 _restarters = Dict()
72 75
73 76 def list_kernel_ids(self):
74 77 """Return a list of the kernel ids of the active kernels."""
75 78 # Create a copy so we can iterate over kernels in operations
76 79 # that delete keys.
77 80 return list(self._kernels.keys())
78 81
79 82 def __len__(self):
80 83 """Return the number of running kernels."""
81 84 return len(self.list_kernel_ids())
82 85
83 86 def __contains__(self, kernel_id):
84 87 return kernel_id in self._kernels
85 88
86 def start_watching(self, kernel_id):
87 km = self.get_kernel(kernel_id):
89 def start_restarter(self, kernel_id):
90 km = self.get_kernel(kernel_id)
88 91 if self.autorestart:
89 kr = KernelRestarter(
90 kernel_manager=km, loop=self.loop,
91 config=self.config, log=self.log
92 )
92 kr = self._restarters.get(kernel_id, None)
93 if kr is None:
94 kr = KernelRestarter(
95 kernel_manager=km, loop=self.loop,
96 config=self.config, log=self.log
97 )
98 self._restarters[kernel_id] = kr
93 99 kr.start()
94 self._restarters[kernel_id] = kr
95 100
96 def stop_watching(self, kernel_id):
101 def stop_restarter(self, kernel_id):
102 if self.autorestart:
103 kr = self._restarters.get(kernel_id, None)
104 if kr is not None:
105 kr.stop()
97 106
107 def clear_restarter(self, kernel_id):
108 if self.autorestart:
109 kr = self._restarters.pop(kernel_id, None)
110 if kr is not None:
111 kr.stop()
98 112
99 113 def start_kernel(self, **kwargs):
100 114 """Start a new kernel.
101 115
102 116 The caller can pick a kernel_id by passing one in as a keyword arg,
103 117 otherwise one will be picked using a uuid.
104 118
105 119 To silence the kernel's stdout/stderr, call this using::
106 120
107 121 km.start_kernel(stdout=PIPE, stderr=PIPE)
108 122
109 123 """
110 124 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
111 125 if kernel_id in self:
112 126 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
113 127 # kernel_manager_factory is the constructor for the KernelManager
114 128 # subclass we are using. It can be configured as any Configurable,
115 129 # including things like its transport and ip.
116 130 km = self.kernel_manager_factory(connection_file=os.path.join(
117 131 self.connection_dir, "kernel-%s.json" % kernel_id),
118 132 config=self.config,
119 133 )
120 134 km.start_kernel(**kwargs)
121 135 # start just the shell channel, needed for graceful restart
122 136 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
123 137 self._kernels[kernel_id] = km
124 self.start_watching(kernel_id)
138 self.start_restarter(kernel_id)
125 139 return kernel_id
126 140
127 141 def shutdown_kernel(self, kernel_id, now=False):
128 142 """Shutdown a kernel by its kernel uuid.
129 143
130 144 Parameters
131 145 ==========
132 146 kernel_id : uuid
133 147 The id of the kernel to shutdown.
134 148 now : bool
135 149 Should the kernel be shutdown forcibly using a signal.
136 150 """
137 151 k = self.get_kernel(kernel_id)
138 self.stop_watching(kernel_id)
152 self.stop_restarter(kernel_id)
139 153 k.shutdown_kernel(now=now)
140 154 k.shell_channel.stop()
141 155 del self._kernels[kernel_id]
142 self.remove_watching(kernel_id)
156 self.clear_restarter(kernel_id)
143 157
144 158 def shutdown_all(self, now=False):
145 159 """Shutdown all kernels."""
146 160 for kid in self.list_kernel_ids():
147 161 self.shutdown_kernel(kid, now=now)
148 162
149 163 def interrupt_kernel(self, kernel_id):
150 164 """Interrupt (SIGINT) the kernel by its uuid.
151 165
152 166 Parameters
153 167 ==========
154 168 kernel_id : uuid
155 169 The id of the kernel to interrupt.
156 170 """
157 171 return self.get_kernel(kernel_id).interrupt_kernel()
158 172
159 173 def signal_kernel(self, kernel_id, signum):
160 174 """Sends a signal to the kernel by its uuid.
161 175
162 176 Note that since only SIGTERM is supported on Windows, this function
163 177 is only useful on Unix systems.
164 178
165 179 Parameters
166 180 ==========
167 181 kernel_id : uuid
168 182 The id of the kernel to signal.
169 183 """
170 184 return self.get_kernel(kernel_id).signal_kernel(signum)
171 185
172 186 def restart_kernel(self, kernel_id):
173 187 """Restart a kernel by its uuid, keeping the same ports.
174 188
175 189 Parameters
176 190 ==========
177 191 kernel_id : uuid
178 192 The id of the kernel to interrupt.
179 193 """
180 194 km = self.get_kernel(kernel_id)
181 self.stop_watching()
195 self.stop_restarter(kernel_id)
182 196 km.restart_kernel()
183 self.start_watching()
197 self.start_restarter(kernel_id)
184 198
185 199 def is_alive(self, kernel_id):
186 200 """Is the kernel alive.
187 201
188 202 This calls KernelManager.is_alive() which calls Popen.poll on the
189 203 actual kernel subprocess.
190 204
191 205 Parameters
192 206 ==========
193 207 kernel_id : uuid
194 208 The id of the kernel.
195 209 """
196 210 return self.get_kernel(kernel_id).is_alive()
197 211
198 212 def get_kernel(self, kernel_id):
199 213 """Get the single KernelManager object for a kernel by its uuid.
200 214
201 215 Parameters
202 216 ==========
203 217 kernel_id : uuid
204 218 The id of the kernel.
205 219 """
206 220 km = self._kernels.get(kernel_id)
207 221 if km is not None:
208 222 return km
209 223 else:
210 224 raise KeyError("Kernel with id not found: %s" % kernel_id)
211 225
212 226 def get_connection_info(self, kernel_id):
213 227 """Return a dictionary of connection data for a kernel.
214 228
215 229 Parameters
216 230 ==========
217 231 kernel_id : uuid
218 232 The id of the kernel.
219 233
220 234 Returns
221 235 =======
222 236 connection_dict : dict
223 237 A dict of the information needed to connect to a kernel.
224 238 This includes the ip address and the integer port
225 239 numbers of the different channels (stdin_port, iopub_port,
226 240 shell_port, hb_port).
227 241 """
228 242 km = self.get_kernel(kernel_id)
229 243 return dict(transport=km.transport,
230 244 ip=km.ip,
231 245 shell_port=km.shell_port,
232 246 iopub_port=km.iopub_port,
233 247 stdin_port=km.stdin_port,
234 248 hb_port=km.hb_port,
235 249 )
236 250
237 251 def _make_url(self, transport, ip, port):
238 252 """Make a ZeroMQ URL for a given transport, ip and port."""
239 253 if transport == 'tcp':
240 254 return "tcp://%s:%i" % (ip, port)
241 255 else:
242 256 return "%s://%s-%s" % (transport, ip, port)
243 257
244 258 def _create_connected_stream(self, kernel_id, socket_type, channel):
245 259 """Create a connected ZMQStream for a kernel."""
246 260 cinfo = self.get_connection_info(kernel_id)
247 261 url = self._make_url(cinfo['transport'], cinfo['ip'],
248 262 cinfo['%s_port' % channel]
249 263 )
250 264 sock = self.context.socket(socket_type)
251 265 self.log.info("Connecting to: %s" % url)
252 266 sock.connect(url)
253 267 return ZMQStream(sock)
254 268
255 269 def create_iopub_stream(self, kernel_id):
256 270 """Return a ZMQStream object connected to the iopub channel.
257 271
258 272 Parameters
259 273 ==========
260 274 kernel_id : uuid
261 275 The id of the kernel.
262 276
263 277 Returns
264 278 =======
265 279 stream : ZMQStream
266 280 """
267 281 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB, 'iopub')
268 282 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
269 283 return iopub_stream
270 284
271 285 def create_shell_stream(self, kernel_id):
272 286 """Return a ZMQStream object connected to the shell channel.
273 287
274 288 Parameters
275 289 ==========
276 290 kernel_id : uuid
277 291 The id of the kernel.
278 292
279 293 Returns
280 294 =======
281 295 stream : ZMQStream
282 296 """
283 297 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER, 'shell')
284 298 return shell_stream
285 299
286 300 def create_hb_stream(self, kernel_id):
287 301 """Return a ZMQStream object connected to the hb channel.
288 302
289 303 Parameters
290 304 ==========
291 305 kernel_id : uuid
292 306 The id of the kernel.
293 307
294 308 Returns
295 309 =======
296 310 stream : ZMQStream
297 311 """
298 312 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ, 'hb')
299 313 return hb_stream
300 314
General Comments 0
You need to be logged in to leave comments. Login now