##// END OF EJS Templates
More work on autorestarting.
Brian Granger -
Show More
@@ -1,62 +1,77 b''
1 """A basic in process kernel monitor with autorestarting.
1 """A basic in process kernel monitor with autorestarting.
2
2
3 This watches a kernel's state using KernelManager.is_alive and auto
3 This watches a kernel's state using KernelManager.is_alive and auto
4 restarts the kernel if it dies.
4 restarts the kernel if it dies.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2013 The IPython Development Team
8 # Copyright (C) 2013 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import zmq
18 import zmq
19 from zmq.eventloop import ioloop
19 from zmq.eventloop import ioloop
20
20
21
21
22 from IPython.config.configurable import LoggingConfigurable
22 from IPython.config.configurable import LoggingConfigurable
23 from IPython.utils.traitlets import (
23 from IPython.utils.traitlets import (
24 Instance, Float
24 Instance, Float
25 )
25 )
26
26
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28 # Code
28 # Code
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30
30
31 class KernelRestarter(LoggingConfigurable):
31 class KernelRestarter(LoggingConfigurable):
32 """Monitor and autorestart a kernel."""
32 """Monitor and autorestart a kernel."""
33
33
34 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
34 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
35 def _loop_default(self):
35 def _loop_default(self):
36 return ioloop.IOLoop.instance()
36 return ioloop.IOLoop.instance()
37
37
38 kernel_manager = Instance('IPython.kernel.kernelmanager.KernelManager')
38 kernel_manager = Instance('IPython.kernel.kernelmanager.KernelManager')
39
39
40 time_to_dead = Float(3.0, config=True,
40 time_to_dead = Float(3.0, config=True,
41 help="""Kernel heartbeat interval in seconds."""
41 help="""Kernel heartbeat interval in seconds."""
42 )
42 )
43
43
44 _pcallback = None
45
44 def __init__(self, **kwargs):
46 def __init__(self, **kwargs):
45 super(KernelRestarter, self).__init__(**kwargs)
47 super(KernelRestarter, self).__init__(**kwargs)
46
48
47 def start(self):
49 def start(self):
48 self.pc = ioloop.PeriodicCallback(self.poll, self.time_to_dead, self.ioloop)
50 """Start the polling of the kernel."""
49 self.pc.start()
51 if self._pcallback is None:
52 self._pcallback = ioloop.PeriodicCallback(
53 self._poll, 1000*self.time_to_dead, self.ioloop
54 )
55 self._pcallback.start()
56
57 def stop(self):
58 """Stop the kernel polling."""
59 if self._pcallback is not None:
60 self._pcallback.stop()
50
61
51 def poll(self):
62 def clear(self):
63 """Clear the underlying PeriodicCallback."""
64 self.stop()
65 if self._pcallback is not None:
66 self._pcallback = None
67
68 def _poll(self):
52 if not self.kernel_manager.is_alive():
69 if not self.kernel_manager.is_alive():
53 self.stop()
70 self.stop()
54 # This restart event should leave the connection file in place so
71 # This restart event should leave the connection file in place so
55 # the ports are the same. Because this takes place below the
72 # the ports are the same. Because this takes place below the
56 # MappingKernelManager, the kernel_id will also remain the same.
73 # MappingKernelManager, the kernel_id will also remain the same.
74 self.log('KernelRestarter: restarting kernel')
57 self.kernel_manager.restart_kernel(now=True);
75 self.kernel_manager.restart_kernel(now=True);
58 self.start()
76 self.start()
59
77
60 def stop(self):
61 self.pc.stop()
62 self.pc = None
@@ -1,300 +1,314 b''
1 """A kernel manager for multiple kernels
1 """A kernel manager for multiple kernels
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2013 The IPython Development Team
9 # Copyright (C) 2013 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from __future__ import absolute_import
19 from __future__ import absolute_import
20
20
21 import os
21 import os
22 import uuid
22 import uuid
23
23
24 import zmq
24 import zmq
25 from zmq.eventloop.zmqstream import ZMQStream
25 from zmq.eventloop.zmqstream import ZMQStream
26 from zmq.eventloop import ioloop
26
27
27 from IPython.config.configurable import LoggingConfigurable
28 from IPython.config.configurable import LoggingConfigurable
28 from IPython.utils.importstring import import_item
29 from IPython.utils.importstring import import_item
29 from IPython.utils.traitlets import (
30 from IPython.utils.traitlets import (
30 Instance, Dict, Unicode, Any, DottedObjectName,
31 Instance, Dict, Unicode, Any, DottedObjectName, Bool
31 )
32 )
33 # from IPython.kernel.kernelrestarter import KernelRestarter
34
32 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
33 # Classes
36 # Classes
34 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
35
38
36 class DuplicateKernelError(Exception):
39 class DuplicateKernelError(Exception):
37 pass
40 pass
38
41
39
42
40 class MultiKernelManager(LoggingConfigurable):
43 class MultiKernelManager(LoggingConfigurable):
41 """A class for managing multiple kernels."""
44 """A class for managing multiple kernels."""
42
45
43 kernel_manager_class = DottedObjectName(
46 kernel_manager_class = DottedObjectName(
44 "IPython.kernel.blockingkernelmanager.BlockingKernelManager", config=True,
47 "IPython.kernel.blockingkernelmanager.BlockingKernelManager", config=True,
45 help="""The kernel manager class. This is configurable to allow
48 help="""The kernel manager class. This is configurable to allow
46 subclassing of the KernelManager for customized behavior.
49 subclassing of the KernelManager for customized behavior.
47 """
50 """
48 )
51 )
49 def _kernel_manager_class_changed(self, name, old, new):
52 def _kernel_manager_class_changed(self, name, old, new):
50 self.kernel_manager_factory = import_item(new)
53 self.kernel_manager_factory = import_item(new)
51
54
52 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
55 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
53 def _kernel_manager_factory_default(self):
56 def _kernel_manager_factory_default(self):
54 return import_item(self.kernel_manager_class)
57 return import_item(self.kernel_manager_class)
55
58
56 context = Instance('zmq.Context')
59 context = Instance('zmq.Context')
57 def _context_default(self):
60 def _context_default(self):
58 return zmq.Context.instance()
61 return zmq.Context.instance()
59
62
60 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
63 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
61 def _loop_default(self):
64 def _loop_default(self):
62 return ioloop.IOLoop.instance()
65 return ioloop.IOLoop.instance()
63
66
64 autorestart = Bool(True, config=True,
67 autorestart = Bool(False, config=True,
65 help="""Should we autorestart kernels that die."""
68 help="""Should we autorestart kernels that die."""
66 )
69 )
67
70
68 connection_dir = Unicode('')
71 connection_dir = Unicode('')
69
72
70 _kernels = Dict()
73 _kernels = Dict()
71 _restarters = Dict()
74 _restarters = Dict()
72
75
73 def list_kernel_ids(self):
76 def list_kernel_ids(self):
74 """Return a list of the kernel ids of the active kernels."""
77 """Return a list of the kernel ids of the active kernels."""
75 # Create a copy so we can iterate over kernels in operations
78 # Create a copy so we can iterate over kernels in operations
76 # that delete keys.
79 # that delete keys.
77 return list(self._kernels.keys())
80 return list(self._kernels.keys())
78
81
79 def __len__(self):
82 def __len__(self):
80 """Return the number of running kernels."""
83 """Return the number of running kernels."""
81 return len(self.list_kernel_ids())
84 return len(self.list_kernel_ids())
82
85
83 def __contains__(self, kernel_id):
86 def __contains__(self, kernel_id):
84 return kernel_id in self._kernels
87 return kernel_id in self._kernels
85
88
86 def start_watching(self, kernel_id):
89 def start_restarter(self, kernel_id):
87 km = self.get_kernel(kernel_id):
90 km = self.get_kernel(kernel_id)
88 if self.autorestart:
91 if self.autorestart:
89 kr = KernelRestarter(
92 kr = self._restarters.get(kernel_id, None)
90 kernel_manager=km, loop=self.loop,
93 if kr is None:
91 config=self.config, log=self.log
94 kr = KernelRestarter(
92 )
95 kernel_manager=km, loop=self.loop,
96 config=self.config, log=self.log
97 )
98 self._restarters[kernel_id] = kr
93 kr.start()
99 kr.start()
94 self._restarters[kernel_id] = kr
95
100
96 def stop_watching(self, kernel_id):
101 def stop_restarter(self, kernel_id):
102 if self.autorestart:
103 kr = self._restarters.get(kernel_id, None)
104 if kr is not None:
105 kr.stop()
97
106
107 def clear_restarter(self, kernel_id):
108 if self.autorestart:
109 kr = self._restarters.pop(kernel_id, None)
110 if kr is not None:
111 kr.stop()
98
112
99 def start_kernel(self, **kwargs):
113 def start_kernel(self, **kwargs):
100 """Start a new kernel.
114 """Start a new kernel.
101
115
102 The caller can pick a kernel_id by passing one in as a keyword arg,
116 The caller can pick a kernel_id by passing one in as a keyword arg,
103 otherwise one will be picked using a uuid.
117 otherwise one will be picked using a uuid.
104
118
105 To silence the kernel's stdout/stderr, call this using::
119 To silence the kernel's stdout/stderr, call this using::
106
120
107 km.start_kernel(stdout=PIPE, stderr=PIPE)
121 km.start_kernel(stdout=PIPE, stderr=PIPE)
108
122
109 """
123 """
110 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
124 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
111 if kernel_id in self:
125 if kernel_id in self:
112 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
126 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
113 # kernel_manager_factory is the constructor for the KernelManager
127 # kernel_manager_factory is the constructor for the KernelManager
114 # subclass we are using. It can be configured as any Configurable,
128 # subclass we are using. It can be configured as any Configurable,
115 # including things like its transport and ip.
129 # including things like its transport and ip.
116 km = self.kernel_manager_factory(connection_file=os.path.join(
130 km = self.kernel_manager_factory(connection_file=os.path.join(
117 self.connection_dir, "kernel-%s.json" % kernel_id),
131 self.connection_dir, "kernel-%s.json" % kernel_id),
118 config=self.config,
132 config=self.config,
119 )
133 )
120 km.start_kernel(**kwargs)
134 km.start_kernel(**kwargs)
121 # start just the shell channel, needed for graceful restart
135 # start just the shell channel, needed for graceful restart
122 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
136 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
123 self._kernels[kernel_id] = km
137 self._kernels[kernel_id] = km
124 self.start_watching(kernel_id)
138 self.start_restarter(kernel_id)
125 return kernel_id
139 return kernel_id
126
140
127 def shutdown_kernel(self, kernel_id, now=False):
141 def shutdown_kernel(self, kernel_id, now=False):
128 """Shutdown a kernel by its kernel uuid.
142 """Shutdown a kernel by its kernel uuid.
129
143
130 Parameters
144 Parameters
131 ==========
145 ==========
132 kernel_id : uuid
146 kernel_id : uuid
133 The id of the kernel to shutdown.
147 The id of the kernel to shutdown.
134 now : bool
148 now : bool
135 Should the kernel be shutdown forcibly using a signal.
149 Should the kernel be shutdown forcibly using a signal.
136 """
150 """
137 k = self.get_kernel(kernel_id)
151 k = self.get_kernel(kernel_id)
138 self.stop_watching(kernel_id)
152 self.stop_restarter(kernel_id)
139 k.shutdown_kernel(now=now)
153 k.shutdown_kernel(now=now)
140 k.shell_channel.stop()
154 k.shell_channel.stop()
141 del self._kernels[kernel_id]
155 del self._kernels[kernel_id]
142 self.remove_watching(kernel_id)
156 self.clear_restarter(kernel_id)
143
157
144 def shutdown_all(self, now=False):
158 def shutdown_all(self, now=False):
145 """Shutdown all kernels."""
159 """Shutdown all kernels."""
146 for kid in self.list_kernel_ids():
160 for kid in self.list_kernel_ids():
147 self.shutdown_kernel(kid, now=now)
161 self.shutdown_kernel(kid, now=now)
148
162
149 def interrupt_kernel(self, kernel_id):
163 def interrupt_kernel(self, kernel_id):
150 """Interrupt (SIGINT) the kernel by its uuid.
164 """Interrupt (SIGINT) the kernel by its uuid.
151
165
152 Parameters
166 Parameters
153 ==========
167 ==========
154 kernel_id : uuid
168 kernel_id : uuid
155 The id of the kernel to interrupt.
169 The id of the kernel to interrupt.
156 """
170 """
157 return self.get_kernel(kernel_id).interrupt_kernel()
171 return self.get_kernel(kernel_id).interrupt_kernel()
158
172
159 def signal_kernel(self, kernel_id, signum):
173 def signal_kernel(self, kernel_id, signum):
160 """Sends a signal to the kernel by its uuid.
174 """Sends a signal to the kernel by its uuid.
161
175
162 Note that since only SIGTERM is supported on Windows, this function
176 Note that since only SIGTERM is supported on Windows, this function
163 is only useful on Unix systems.
177 is only useful on Unix systems.
164
178
165 Parameters
179 Parameters
166 ==========
180 ==========
167 kernel_id : uuid
181 kernel_id : uuid
168 The id of the kernel to signal.
182 The id of the kernel to signal.
169 """
183 """
170 return self.get_kernel(kernel_id).signal_kernel(signum)
184 return self.get_kernel(kernel_id).signal_kernel(signum)
171
185
172 def restart_kernel(self, kernel_id):
186 def restart_kernel(self, kernel_id):
173 """Restart a kernel by its uuid, keeping the same ports.
187 """Restart a kernel by its uuid, keeping the same ports.
174
188
175 Parameters
189 Parameters
176 ==========
190 ==========
177 kernel_id : uuid
191 kernel_id : uuid
178 The id of the kernel to interrupt.
192 The id of the kernel to interrupt.
179 """
193 """
180 km = self.get_kernel(kernel_id)
194 km = self.get_kernel(kernel_id)
181 self.stop_watching()
195 self.stop_restarter(kernel_id)
182 km.restart_kernel()
196 km.restart_kernel()
183 self.start_watching()
197 self.start_restarter(kernel_id)
184
198
185 def is_alive(self, kernel_id):
199 def is_alive(self, kernel_id):
186 """Is the kernel alive.
200 """Is the kernel alive.
187
201
188 This calls KernelManager.is_alive() which calls Popen.poll on the
202 This calls KernelManager.is_alive() which calls Popen.poll on the
189 actual kernel subprocess.
203 actual kernel subprocess.
190
204
191 Parameters
205 Parameters
192 ==========
206 ==========
193 kernel_id : uuid
207 kernel_id : uuid
194 The id of the kernel.
208 The id of the kernel.
195 """
209 """
196 return self.get_kernel(kernel_id).is_alive()
210 return self.get_kernel(kernel_id).is_alive()
197
211
198 def get_kernel(self, kernel_id):
212 def get_kernel(self, kernel_id):
199 """Get the single KernelManager object for a kernel by its uuid.
213 """Get the single KernelManager object for a kernel by its uuid.
200
214
201 Parameters
215 Parameters
202 ==========
216 ==========
203 kernel_id : uuid
217 kernel_id : uuid
204 The id of the kernel.
218 The id of the kernel.
205 """
219 """
206 km = self._kernels.get(kernel_id)
220 km = self._kernels.get(kernel_id)
207 if km is not None:
221 if km is not None:
208 return km
222 return km
209 else:
223 else:
210 raise KeyError("Kernel with id not found: %s" % kernel_id)
224 raise KeyError("Kernel with id not found: %s" % kernel_id)
211
225
212 def get_connection_info(self, kernel_id):
226 def get_connection_info(self, kernel_id):
213 """Return a dictionary of connection data for a kernel.
227 """Return a dictionary of connection data for a kernel.
214
228
215 Parameters
229 Parameters
216 ==========
230 ==========
217 kernel_id : uuid
231 kernel_id : uuid
218 The id of the kernel.
232 The id of the kernel.
219
233
220 Returns
234 Returns
221 =======
235 =======
222 connection_dict : dict
236 connection_dict : dict
223 A dict of the information needed to connect to a kernel.
237 A dict of the information needed to connect to a kernel.
224 This includes the ip address and the integer port
238 This includes the ip address and the integer port
225 numbers of the different channels (stdin_port, iopub_port,
239 numbers of the different channels (stdin_port, iopub_port,
226 shell_port, hb_port).
240 shell_port, hb_port).
227 """
241 """
228 km = self.get_kernel(kernel_id)
242 km = self.get_kernel(kernel_id)
229 return dict(transport=km.transport,
243 return dict(transport=km.transport,
230 ip=km.ip,
244 ip=km.ip,
231 shell_port=km.shell_port,
245 shell_port=km.shell_port,
232 iopub_port=km.iopub_port,
246 iopub_port=km.iopub_port,
233 stdin_port=km.stdin_port,
247 stdin_port=km.stdin_port,
234 hb_port=km.hb_port,
248 hb_port=km.hb_port,
235 )
249 )
236
250
237 def _make_url(self, transport, ip, port):
251 def _make_url(self, transport, ip, port):
238 """Make a ZeroMQ URL for a given transport, ip and port."""
252 """Make a ZeroMQ URL for a given transport, ip and port."""
239 if transport == 'tcp':
253 if transport == 'tcp':
240 return "tcp://%s:%i" % (ip, port)
254 return "tcp://%s:%i" % (ip, port)
241 else:
255 else:
242 return "%s://%s-%s" % (transport, ip, port)
256 return "%s://%s-%s" % (transport, ip, port)
243
257
244 def _create_connected_stream(self, kernel_id, socket_type, channel):
258 def _create_connected_stream(self, kernel_id, socket_type, channel):
245 """Create a connected ZMQStream for a kernel."""
259 """Create a connected ZMQStream for a kernel."""
246 cinfo = self.get_connection_info(kernel_id)
260 cinfo = self.get_connection_info(kernel_id)
247 url = self._make_url(cinfo['transport'], cinfo['ip'],
261 url = self._make_url(cinfo['transport'], cinfo['ip'],
248 cinfo['%s_port' % channel]
262 cinfo['%s_port' % channel]
249 )
263 )
250 sock = self.context.socket(socket_type)
264 sock = self.context.socket(socket_type)
251 self.log.info("Connecting to: %s" % url)
265 self.log.info("Connecting to: %s" % url)
252 sock.connect(url)
266 sock.connect(url)
253 return ZMQStream(sock)
267 return ZMQStream(sock)
254
268
255 def create_iopub_stream(self, kernel_id):
269 def create_iopub_stream(self, kernel_id):
256 """Return a ZMQStream object connected to the iopub channel.
270 """Return a ZMQStream object connected to the iopub channel.
257
271
258 Parameters
272 Parameters
259 ==========
273 ==========
260 kernel_id : uuid
274 kernel_id : uuid
261 The id of the kernel.
275 The id of the kernel.
262
276
263 Returns
277 Returns
264 =======
278 =======
265 stream : ZMQStream
279 stream : ZMQStream
266 """
280 """
267 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB, 'iopub')
281 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB, 'iopub')
268 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
282 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
269 return iopub_stream
283 return iopub_stream
270
284
271 def create_shell_stream(self, kernel_id):
285 def create_shell_stream(self, kernel_id):
272 """Return a ZMQStream object connected to the shell channel.
286 """Return a ZMQStream object connected to the shell channel.
273
287
274 Parameters
288 Parameters
275 ==========
289 ==========
276 kernel_id : uuid
290 kernel_id : uuid
277 The id of the kernel.
291 The id of the kernel.
278
292
279 Returns
293 Returns
280 =======
294 =======
281 stream : ZMQStream
295 stream : ZMQStream
282 """
296 """
283 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER, 'shell')
297 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER, 'shell')
284 return shell_stream
298 return shell_stream
285
299
286 def create_hb_stream(self, kernel_id):
300 def create_hb_stream(self, kernel_id):
287 """Return a ZMQStream object connected to the hb channel.
301 """Return a ZMQStream object connected to the hb channel.
288
302
289 Parameters
303 Parameters
290 ==========
304 ==========
291 kernel_id : uuid
305 kernel_id : uuid
292 The id of the kernel.
306 The id of the kernel.
293
307
294 Returns
308 Returns
295 =======
309 =======
296 stream : ZMQStream
310 stream : ZMQStream
297 """
311 """
298 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ, 'hb')
312 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ, 'hb')
299 return hb_stream
313 return hb_stream
300
314
General Comments 0
You need to be logged in to leave comments. Login now