##// END OF EJS Templates
cleanup boundaries of MultiKernelManager and KernelRestarter classes...
MinRK -
Show More
@@ -0,0 +1,67 b''
1 """A basic kernel monitor with autorestarting.
2
3 This watches a kernel's state using KernelManager.is_alive and auto
4 restarts the kernel if it dies.
5
6 It is an incomplete base class, and must be subclassed.
7 """
8
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2013 The IPython Development Team
11 #
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
15
16 #-----------------------------------------------------------------------------
17 # Imports
18 #-----------------------------------------------------------------------------
19
20
21 from IPython.config.configurable import LoggingConfigurable
22 from IPython.utils.traitlets import (
23 Instance, Float, List,
24 )
25
26 #-----------------------------------------------------------------------------
27 # Code
28 #-----------------------------------------------------------------------------
29
30 class KernelRestarter(LoggingConfigurable):
31 """Monitor and autorestart a kernel."""
32
33 kernel_manager = Instance('IPython.kernel.KernelManager')
34
35 time_to_dead = Float(3.0, config=True,
36 help="""Kernel heartbeat interval in seconds."""
37 )
38 _callbacks = List()
39
40 def start(self):
41 """Start the polling of the kernel."""
42 raise NotImplementedError("Must be implemented in a subclass")
43
44 def stop(self):
45 """Stop the kernel polling."""
46 raise NotImplementedError("Must be implemented in a subclass")
47
48 def register_callback(self, f):
49 """register a callback to fire"""
50 self.callbacks.append(f)
51
52 def unregister_callback(self, f):
53 try:
54 self.callbacks.remove(f)
55 except ValueError:
56 pass
57
58 def poll(self):
59 self.log.debug('Polling kernel...')
60 if not self.kernel_manager.is_alive():
61 self.log.info('KernelRestarter: restarting kernel')
62 for callback in self.callbacks:
63 try:
64 callback()
65 except Exception as e:
66 self.log.error("Kernel restart callback %r failed", callback, exc_info=True)
67 self.kernel_manager.restart_kernel(now=True)
@@ -480,7 +480,7 b' class IOPubHandler(AuthenticatedZMQStreamHandler):'
480 km = self.application.kernel_manager
480 km = self.application.kernel_manager
481 kernel_id = self.kernel_id
481 kernel_id = self.kernel_id
482 try:
482 try:
483 self.iopub_stream = km.create_iopub_stream(kernel_id)
483 self.iopub_stream = km.connect_iopub(kernel_id)
484 except web.HTTPError:
484 except web.HTTPError:
485 # WebSockets don't response to traditional error codes so we
485 # WebSockets don't response to traditional error codes so we
486 # close the connection.
486 # close the connection.
@@ -517,7 +517,7 b' class ShellHandler(AuthenticatedZMQStreamHandler):'
517 self.max_msg_size = km.max_msg_size
517 self.max_msg_size = km.max_msg_size
518 kernel_id = self.kernel_id
518 kernel_id = self.kernel_id
519 try:
519 try:
520 self.shell_stream = km.create_shell_stream(kernel_id)
520 self.shell_stream = km.connect_shell(kernel_id)
521 except web.HTTPError:
521 except web.HTTPError:
522 # WebSockets don't response to traditional error codes so we
522 # WebSockets don't response to traditional error codes so we
523 # close the connection.
523 # close the connection.
@@ -31,6 +31,9 b' from IPython.utils.traitlets import ('
31 class MappingKernelManager(MultiKernelManager):
31 class MappingKernelManager(MultiKernelManager):
32 """A KernelManager that handles notebok mapping and HTTP error handling"""
32 """A KernelManager that handles notebok mapping and HTTP error handling"""
33
33
34 def _kernel_manager_class_default(self):
35 return "IPython.kernel.ioloop.IOLoopKernelManager"
36
34 kernel_argv = List(Unicode)
37 kernel_argv = List(Unicode)
35
38
36 max_msg_size = Integer(65536, config=True, help="""
39 max_msg_size = Integer(65536, config=True, help="""
@@ -88,42 +91,7 b' class MappingKernelManager(MultiKernelManager):'
88 self.log.info("Using existing kernel: %s" % kernel_id)
91 self.log.info("Using existing kernel: %s" % kernel_id)
89 return kernel_id
92 return kernel_id
90
93
91 def shutdown_kernel(self, kernel_id, now=False):
94 # override _check_kernel_id to raise 404 instead of KeyError
92 """Shutdown a kernel and remove its notebook association."""
93 self._check_kernel_id(kernel_id)
94 super(MappingKernelManager, self).shutdown_kernel(
95 kernel_id, now=now
96 )
97 self.delete_mapping_for_kernel(kernel_id)
98 self.log.info("Kernel shutdown: %s" % kernel_id)
99
100 def interrupt_kernel(self, kernel_id):
101 """Interrupt a kernel."""
102 self._check_kernel_id(kernel_id)
103 super(MappingKernelManager, self).interrupt_kernel(kernel_id)
104 self.log.info("Kernel interrupted: %s" % kernel_id)
105
106 def restart_kernel(self, kernel_id):
107 """Restart a kernel while keeping clients connected."""
108 self._check_kernel_id(kernel_id)
109 super(MappingKernelManager, self).restart_kernel(kernel_id)
110 self.log.info("Kernel restarted: %s" % kernel_id)
111
112 def create_iopub_stream(self, kernel_id):
113 """Create a new iopub stream."""
114 self._check_kernel_id(kernel_id)
115 return super(MappingKernelManager, self).create_iopub_stream(kernel_id)
116
117 def create_shell_stream(self, kernel_id):
118 """Create a new shell stream."""
119 self._check_kernel_id(kernel_id)
120 return super(MappingKernelManager, self).create_shell_stream(kernel_id)
121
122 def create_hb_stream(self, kernel_id):
123 """Create a new hb stream."""
124 self._check_kernel_id(kernel_id)
125 return super(MappingKernelManager, self).create_hb_stream(kernel_id)
126
127 def _check_kernel_id(self, kernel_id):
95 def _check_kernel_id(self, kernel_id):
128 """Check a that a kernel_id exists and raise 404 if not."""
96 """Check a that a kernel_id exists and raise 404 if not."""
129 if kernel_id not in self:
97 if kernel_id not in self:
@@ -15,6 +15,7 b' from __future__ import absolute_import'
15
15
16 import zmq
16 import zmq
17 from zmq.eventloop import ioloop
17 from zmq.eventloop import ioloop
18 from zmq.eventloop.zmqstream import ZMQStream
18
19
19 from IPython.utils.traitlets import (
20 from IPython.utils.traitlets import (
20 Instance
21 Instance
@@ -27,6 +28,13 b' from .restarter import IOLoopKernelRestarter'
27 # Code
28 # Code
28 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
29
30
31
32 def as_zmqstream(f):
33 def wrapped(self, *args, **kwargs):
34 socket = f(self, *args, **kwargs)
35 return ZMQStream(socket, self.loop)
36 return wrapped
37
30 class IOLoopKernelManager(KernelManager):
38 class IOLoopKernelManager(KernelManager):
31
39
32 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
40 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
@@ -48,3 +56,8 b' class IOLoopKernelManager(KernelManager):'
48 if self.autorestart:
56 if self.autorestart:
49 if self._restarter is not None:
57 if self._restarter is not None:
50 self._restarter.stop()
58 self._restarter.stop()
59
60 connect_shell = as_zmqstream(KernelManager.connect_shell)
61 connect_iopub = as_zmqstream(KernelManager.connect_iopub)
62 connect_stdin = as_zmqstream(KernelManager.connect_stdin)
63 connect_hb = as_zmqstream(KernelManager.connect_hb)
@@ -21,24 +21,22 b' import zmq'
21 from zmq.eventloop import ioloop
21 from zmq.eventloop import ioloop
22
22
23
23
24 from IPython.config.configurable import LoggingConfigurable
24 from IPython.kernel.restarter import KernelRestarter
25 from IPython.utils.traitlets import (
25 from IPython.utils.traitlets import (
26 Instance, Float
26 Instance, Float, List,
27 )
27 )
28
28
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 # Code
30 # Code
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32
32
33 class IOLoopKernelRestarter(LoggingConfigurable):
33 class IOLoopKernelRestarter(KernelRestarter):
34 """Monitor and autorestart a kernel."""
34 """Monitor and autorestart a kernel."""
35
35
36 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
36 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
37 def _loop_default(self):
37 def _loop_default(self):
38 return ioloop.IOLoop.instance()
38 return ioloop.IOLoop.instance()
39
39
40 kernel_manager = Instance('IPython.kernel.KernelManager')
41
42 time_to_dead = Float(3.0, config=True,
40 time_to_dead = Float(3.0, config=True,
43 help="""Kernel heartbeat interval in seconds."""
41 help="""Kernel heartbeat interval in seconds."""
44 )
42 )
@@ -49,7 +47,7 b' class IOLoopKernelRestarter(LoggingConfigurable):'
49 """Start the polling of the kernel."""
47 """Start the polling of the kernel."""
50 if self._pcallback is None:
48 if self._pcallback is None:
51 self._pcallback = ioloop.PeriodicCallback(
49 self._pcallback = ioloop.PeriodicCallback(
52 self._poll, 1000*self.time_to_dead, self.loop
50 self.poll, 1000*self.time_to_dead, self.loop
53 )
51 )
54 self._pcallback.start()
52 self._pcallback.start()
55
53
@@ -61,14 +59,4 b' class IOLoopKernelRestarter(LoggingConfigurable):'
61 def clear(self):
59 def clear(self):
62 """Clear the underlying PeriodicCallback."""
60 """Clear the underlying PeriodicCallback."""
63 self.stop()
61 self.stop()
64 if self._pcallback is not None:
62 self._pcallback = None
65 self._pcallback = None
66
67 def _poll(self):
68 self.log.debug('Polling kernel...')
69 if not self.kernel_manager.is_alive():
70 # This restart event should leave the connection file in place so
71 # the ports are the same. Because this takes place below the
72 # MappingKernelManager, the kernel_id will also remain the same.
73 self.log.info('KernelRestarter: restarting kernel')
74 self.kernel_manager.restart_kernel(now=True)
@@ -41,6 +41,13 b' from .managerabc import ('
41 # Main kernel manager class
41 # Main kernel manager class
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43
43
44 _socket_types = {
45 'hb' : zmq.REQ,
46 'shell' : zmq.DEALER,
47 'iopub' : zmq.SUB,
48 'stdin' : zmq.DEALER,
49 }
50
44 class KernelManager(LoggingConfigurable, ConnectionFileMixin):
51 class KernelManager(LoggingConfigurable, ConnectionFileMixin):
45 """Manages a single kernel in a subprocess on this host.
52 """Manages a single kernel in a subprocess on this host.
46
53
@@ -93,6 +100,59 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
93 pass
100 pass
94
101
95 #--------------------------------------------------------------------------
102 #--------------------------------------------------------------------------
103 # Connection info
104 #--------------------------------------------------------------------------
105
106 def get_connection_info(self):
107 """return the connection info as a dict"""
108 return dict(
109 transport=self.transport,
110 ip=self.ip,
111 shell_port=self.shell_port,
112 iopub_port=self.iopub_port,
113 stdin_port=self.stdin_port,
114 hb_port=self.hb_port,
115 )
116
117 def _make_url(self, channel):
118 """Make a ZeroMQ URL for a given channel."""
119 transport = self.transport
120 ip = self.ip
121 port = getattr(self, '%s_port' % channel)
122
123 if transport == 'tcp':
124 return "tcp://%s:%i" % (ip, port)
125 else:
126 return "%s://%s-%s" % (transport, ip, port)
127
128 def _create_connected_socket(self, channel):
129 """Create a zmq Socket and connect it to the kernel."""
130 url = self._make_url(channel)
131 socket_type = _socket_types[channel]
132 sock = self.context.socket(socket_type)
133 self.log.info("Connecting to: %s" % url)
134 sock.connect(url)
135 return sock
136
137 def connect_iopub(self):
138 """return zmq Socket connected to the IOPub channel"""
139 sock = self._create_connected_socket('iopub')
140 sock.setsockopt(zmq.SUBSCRIBE, b'')
141 return sock
142
143 def connect_shell(self):
144 """return zmq Socket connected to the Shell channel"""
145 return self._create_connected_socket('shell')
146
147 def connect_stdin(self):
148 """return zmq Socket connected to the StdIn channel"""
149 return self._create_connected_socket('stdin')
150
151 def connect_hb(self):
152 """return zmq Socket connected to the Heartbeat channel"""
153 return self._create_connected_socket('hb')
154
155 #--------------------------------------------------------------------------
96 # Kernel management
156 # Kernel management
97 #--------------------------------------------------------------------------
157 #--------------------------------------------------------------------------
98
158
@@ -22,7 +22,6 b' import os'
22 import uuid
22 import uuid
23
23
24 import zmq
24 import zmq
25 from zmq.eventloop.zmqstream import ZMQStream
26
25
27 from IPython.config.configurable import LoggingConfigurable
26 from IPython.config.configurable import LoggingConfigurable
28 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
@@ -38,6 +37,23 b' class DuplicateKernelError(Exception):'
38 pass
37 pass
39
38
40
39
40
41 def kernel_method(f):
42 """decorator for proxying MKM.method(kernel_id) to individual KMs by ID"""
43 def wrapped(self, kernel_id, *args, **kwargs):
44 # get the kernel
45 km = self.get_kernel(kernel_id)
46 method = getattr(km, f.__name__)
47 # call the kernel's method
48 r = method(*args, **kwargs)
49 # last thing, call anything defined in the actual class method
50 # such as logging messages
51 f(self, kernel_id, *args, **kwargs)
52 # return the method result
53 return r
54 return wrapped
55
56
41 class MultiKernelManager(LoggingConfigurable):
57 class MultiKernelManager(LoggingConfigurable):
42 """A class for managing multiple kernels."""
58 """A class for managing multiple kernels."""
43
59
@@ -100,6 +116,7 b' class MultiKernelManager(LoggingConfigurable):'
100 self._kernels[kernel_id] = km
116 self._kernels[kernel_id] = km
101 return kernel_id
117 return kernel_id
102
118
119 @kernel_method
103 def shutdown_kernel(self, kernel_id, now=False):
120 def shutdown_kernel(self, kernel_id, now=False):
104 """Shutdown a kernel by its kernel uuid.
121 """Shutdown a kernel by its kernel uuid.
105
122
@@ -110,8 +127,7 b' class MultiKernelManager(LoggingConfigurable):'
110 now : bool
127 now : bool
111 Should the kernel be shutdown forcibly using a signal.
128 Should the kernel be shutdown forcibly using a signal.
112 """
129 """
113 k = self.get_kernel(kernel_id)
130 self.log.info("Kernel shutdown: %s" % kernel_id)
114 k.shutdown_kernel(now=now)
115 del self._kernels[kernel_id]
131 del self._kernels[kernel_id]
116
132
117 def shutdown_all(self, now=False):
133 def shutdown_all(self, now=False):
@@ -119,6 +135,7 b' class MultiKernelManager(LoggingConfigurable):'
119 for kid in self.list_kernel_ids():
135 for kid in self.list_kernel_ids():
120 self.shutdown_kernel(kid, now=now)
136 self.shutdown_kernel(kid, now=now)
121
137
138 @kernel_method
122 def interrupt_kernel(self, kernel_id):
139 def interrupt_kernel(self, kernel_id):
123 """Interrupt (SIGINT) the kernel by its uuid.
140 """Interrupt (SIGINT) the kernel by its uuid.
124
141
@@ -127,8 +144,9 b' class MultiKernelManager(LoggingConfigurable):'
127 kernel_id : uuid
144 kernel_id : uuid
128 The id of the kernel to interrupt.
145 The id of the kernel to interrupt.
129 """
146 """
130 return self.get_kernel(kernel_id).interrupt_kernel()
147 self.log.info("Kernel interrupted: %s" % kernel_id)
131
148
149 @kernel_method
132 def signal_kernel(self, kernel_id, signum):
150 def signal_kernel(self, kernel_id, signum):
133 """Sends a signal to the kernel by its uuid.
151 """Sends a signal to the kernel by its uuid.
134
152
@@ -140,8 +158,9 b' class MultiKernelManager(LoggingConfigurable):'
140 kernel_id : uuid
158 kernel_id : uuid
141 The id of the kernel to signal.
159 The id of the kernel to signal.
142 """
160 """
143 return self.get_kernel(kernel_id).signal_kernel(signum)
161 self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum))
144
162
163 @kernel_method
145 def restart_kernel(self, kernel_id):
164 def restart_kernel(self, kernel_id):
146 """Restart a kernel by its uuid, keeping the same ports.
165 """Restart a kernel by its uuid, keeping the same ports.
147
166
@@ -150,9 +169,9 b' class MultiKernelManager(LoggingConfigurable):'
150 kernel_id : uuid
169 kernel_id : uuid
151 The id of the kernel to interrupt.
170 The id of the kernel to interrupt.
152 """
171 """
153 km = self.get_kernel(kernel_id)
172 self.log.info("Kernel restarted: %s" % kernel_id)
154 km.restart_kernel()
155
173
174 @kernel_method
156 def is_alive(self, kernel_id):
175 def is_alive(self, kernel_id):
157 """Is the kernel alive.
176 """Is the kernel alive.
158
177
@@ -164,7 +183,11 b' class MultiKernelManager(LoggingConfigurable):'
164 kernel_id : uuid
183 kernel_id : uuid
165 The id of the kernel.
184 The id of the kernel.
166 """
185 """
167 return self.get_kernel(kernel_id).is_alive()
186
187 def _check_kernel_id(self, kernel_id):
188 """check that a kernel id is valid"""
189 if kernel_id not in self:
190 raise KeyError("Kernel with id not found: %s" % kernel_id)
168
191
169 def get_kernel(self, kernel_id):
192 def get_kernel(self, kernel_id):
170 """Get the single KernelManager object for a kernel by its uuid.
193 """Get the single KernelManager object for a kernel by its uuid.
@@ -174,12 +197,10 b' class MultiKernelManager(LoggingConfigurable):'
174 kernel_id : uuid
197 kernel_id : uuid
175 The id of the kernel.
198 The id of the kernel.
176 """
199 """
177 km = self._kernels.get(kernel_id)
200 self._check_kernel_id(kernel_id)
178 if km is not None:
201 return self._kernels[kernel_id]
179 return km
180 else:
181 raise KeyError("Kernel with id not found: %s" % kernel_id)
182
202
203 @kernel_method
183 def get_connection_info(self, kernel_id):
204 def get_connection_info(self, kernel_id):
184 """Return a dictionary of connection data for a kernel.
205 """Return a dictionary of connection data for a kernel.
185
206
@@ -196,35 +217,10 b' class MultiKernelManager(LoggingConfigurable):'
196 numbers of the different channels (stdin_port, iopub_port,
217 numbers of the different channels (stdin_port, iopub_port,
197 shell_port, hb_port).
218 shell_port, hb_port).
198 """
219 """
199 km = self.get_kernel(kernel_id)
200 return dict(transport=km.transport,
201 ip=km.ip,
202 shell_port=km.shell_port,
203 iopub_port=km.iopub_port,
204 stdin_port=km.stdin_port,
205 hb_port=km.hb_port,
206 )
207
208 def _make_url(self, transport, ip, port):
209 """Make a ZeroMQ URL for a given transport, ip and port."""
210 if transport == 'tcp':
211 return "tcp://%s:%i" % (ip, port)
212 else:
213 return "%s://%s-%s" % (transport, ip, port)
214
215 def _create_connected_stream(self, kernel_id, socket_type, channel):
216 """Create a connected ZMQStream for a kernel."""
217 cinfo = self.get_connection_info(kernel_id)
218 url = self._make_url(cinfo['transport'], cinfo['ip'],
219 cinfo['%s_port' % channel]
220 )
221 sock = self.context.socket(socket_type)
222 self.log.info("Connecting to: %s" % url)
223 sock.connect(url)
224 return ZMQStream(sock)
225
220
226 def create_iopub_stream(self, kernel_id):
221 @kernel_method
227 """Return a ZMQStream object connected to the iopub channel.
222 def connect_iopub(self, kernel_id):
223 """Return a zmq Socket connected to the iopub channel.
228
224
229 Parameters
225 Parameters
230 ==========
226 ==========
@@ -233,14 +229,12 b' class MultiKernelManager(LoggingConfigurable):'
233
229
234 Returns
230 Returns
235 =======
231 =======
236 stream : ZMQStream
232 stream : zmq Socket or ZMQStream
237 """
233 """
238 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB, 'iopub')
239 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
240 return iopub_stream
241
234
242 def create_shell_stream(self, kernel_id):
235 @kernel_method
243 """Return a ZMQStream object connected to the shell channel.
236 def connect_shell(self, kernel_id):
237 """Return a zmq Socket connected to the shell channel.
244
238
245 Parameters
239 Parameters
246 ==========
240 ==========
@@ -249,13 +243,12 b' class MultiKernelManager(LoggingConfigurable):'
249
243
250 Returns
244 Returns
251 =======
245 =======
252 stream : ZMQStream
246 stream : zmq Socket or ZMQStream
253 """
247 """
254 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER, 'shell')
255 return shell_stream
256
248
257 def create_hb_stream(self, kernel_id):
249 @kernel_method
258 """Return a ZMQStream object connected to the hb channel.
250 def connect_stdin(self, kernel_id):
251 """Return a zmq Socket connected to the stdin channel.
259
252
260 Parameters
253 Parameters
261 ==========
254 ==========
@@ -264,8 +257,19 b' class MultiKernelManager(LoggingConfigurable):'
264
257
265 Returns
258 Returns
266 =======
259 =======
267 stream : ZMQStream
260 stream : zmq Socket or ZMQStream
268 """
261 """
269 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ, 'hb')
270 return hb_stream
271
262
263 @kernel_method
264 def connect_hb(self, kernel_id):
265 """Return a zmq Socket connected to the hb channel.
266
267 Parameters
268 ==========
269 kernel_id : uuid
270 The id of the kernel.
271
272 Returns
273 =======
274 stream : zmq Socket or ZMQStream
275 """
@@ -57,13 +57,13 b' class TestKernelManager(TestCase):'
57 self.assertEqual(ip, cinfo['ip'])
57 self.assertEqual(ip, cinfo['ip'])
58 self.assertTrue('stdin_port' in cinfo)
58 self.assertTrue('stdin_port' in cinfo)
59 self.assertTrue('iopub_port' in cinfo)
59 self.assertTrue('iopub_port' in cinfo)
60 stream = km.create_iopub_stream(kid)
60 stream = km.connect_iopub(kid)
61 stream.close()
61 stream.close()
62 self.assertTrue('shell_port' in cinfo)
62 self.assertTrue('shell_port' in cinfo)
63 stream = km.create_shell_stream(kid)
63 stream = km.connect_shell(kid)
64 stream.close()
64 stream.close()
65 self.assertTrue('hb_port' in cinfo)
65 self.assertTrue('hb_port' in cinfo)
66 stream = km.create_hb_stream(kid)
66 stream = km.connect_hb(kid)
67 stream.close()
67 stream.close()
68 km.shutdown_kernel(kid)
68 km.shutdown_kernel(kid)
69
69
General Comments 0
You need to be logged in to leave comments. Login now