##// END OF EJS Templates
enable graceful restart of kernels in notebook
MinRK -
Show More
@@ -1,324 +1,326 b''
1 """A kernel manager for multiple kernels.
1 """A kernel manager for multiple kernels.
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import os
19 import os
20 import signal
20 import signal
21 import sys
21 import sys
22 import uuid
22 import uuid
23
23
24 import zmq
24 import zmq
25 from zmq.eventloop.zmqstream import ZMQStream
25 from zmq.eventloop.zmqstream import ZMQStream
26
26
27 from tornado import web
27 from tornado import web
28
28
29 from IPython.config.configurable import LoggingConfigurable
29 from IPython.config.configurable import LoggingConfigurable
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.traitlets import (
31 from IPython.utils.traitlets import (
32 Instance, Dict, List, Unicode, Float, Integer, Any, DottedObjectName,
32 Instance, Dict, List, Unicode, Float, Integer, Any, DottedObjectName,
33 )
33 )
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Classes
35 # Classes
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38 class DuplicateKernelError(Exception):
38 class DuplicateKernelError(Exception):
39 pass
39 pass
40
40
41
41
42 class MultiKernelManager(LoggingConfigurable):
42 class MultiKernelManager(LoggingConfigurable):
43 """A class for managing multiple kernels."""
43 """A class for managing multiple kernels."""
44
44
45 kernel_manager_class = DottedObjectName(
45 kernel_manager_class = DottedObjectName(
46 "IPython.zmq.kernelmanager.KernelManager", config=True,
46 "IPython.zmq.blockingkernelmanager.BlockingKernelManager", config=True,
47 help="""The kernel manager class. This is configurable to allow
47 help="""The kernel manager class. This is configurable to allow
48 subclassing of the KernelManager for customized behavior.
48 subclassing of the KernelManager for customized behavior.
49 """
49 """
50 )
50 )
51 def _kernel_manager_class_changed(self, name, old, new):
51 def _kernel_manager_class_changed(self, name, old, new):
52 self.kernel_manager_factory = import_item(new)
52 self.kernel_manager_factory = import_item(new)
53
53
54 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
54 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
55 def _kernel_manager_factory_default(self):
55 def _kernel_manager_factory_default(self):
56 return import_item(self.kernel_manager_class)
56 return import_item(self.kernel_manager_class)
57
57
58 context = Instance('zmq.Context')
58 context = Instance('zmq.Context')
59 def _context_default(self):
59 def _context_default(self):
60 return zmq.Context.instance()
60 return zmq.Context.instance()
61
61
62 connection_dir = Unicode('')
62 connection_dir = Unicode('')
63
63
64 _kernels = Dict()
64 _kernels = Dict()
65
65
66 @property
66 @property
67 def kernel_ids(self):
67 def kernel_ids(self):
68 """Return a list of the kernel ids of the active kernels."""
68 """Return a list of the kernel ids of the active kernels."""
69 return self._kernels.keys()
69 return self._kernels.keys()
70
70
71 def __len__(self):
71 def __len__(self):
72 """Return the number of running kernels."""
72 """Return the number of running kernels."""
73 return len(self.kernel_ids)
73 return len(self.kernel_ids)
74
74
75 def __contains__(self, kernel_id):
75 def __contains__(self, kernel_id):
76 if kernel_id in self.kernel_ids:
76 if kernel_id in self.kernel_ids:
77 return True
77 return True
78 else:
78 else:
79 return False
79 return False
80
80
81 def start_kernel(self, **kwargs):
81 def start_kernel(self, **kwargs):
82 """Start a new kernel."""
82 """Start a new kernel."""
83 kernel_id = unicode(uuid.uuid4())
83 kernel_id = unicode(uuid.uuid4())
84 # use base KernelManager for each Kernel
84 # use base KernelManager for each Kernel
85 km = self.kernel_manager_factory(connection_file=os.path.join(
85 km = self.kernel_manager_factory(connection_file=os.path.join(
86 self.connection_dir, "kernel-%s.json" % kernel_id),
86 self.connection_dir, "kernel-%s.json" % kernel_id),
87 config=self.config,
87 config=self.config,
88 )
88 )
89 km.start_kernel(**kwargs)
89 km.start_kernel(**kwargs)
90 # start the shell channel, needed for graceful restart
91 km.start_channels(shell=True, sub=False, stdin=False, hb=False)
90 self._kernels[kernel_id] = km
92 self._kernels[kernel_id] = km
91 return kernel_id
93 return kernel_id
92
94
93 def kill_kernel(self, kernel_id):
95 def kill_kernel(self, kernel_id):
94 """Kill a kernel by its kernel uuid.
96 """Kill a kernel by its kernel uuid.
95
97
96 Parameters
98 Parameters
97 ==========
99 ==========
98 kernel_id : uuid
100 kernel_id : uuid
99 The id of the kernel to kill.
101 The id of the kernel to kill.
100 """
102 """
101 self.get_kernel(kernel_id).kill_kernel()
103 self.get_kernel(kernel_id).kill_kernel()
102 del self._kernels[kernel_id]
104 del self._kernels[kernel_id]
103
105
104 def interrupt_kernel(self, kernel_id):
106 def interrupt_kernel(self, kernel_id):
105 """Interrupt (SIGINT) the kernel by its uuid.
107 """Interrupt (SIGINT) the kernel by its uuid.
106
108
107 Parameters
109 Parameters
108 ==========
110 ==========
109 kernel_id : uuid
111 kernel_id : uuid
110 The id of the kernel to interrupt.
112 The id of the kernel to interrupt.
111 """
113 """
112 return self.get_kernel(kernel_id).interrupt_kernel()
114 return self.get_kernel(kernel_id).interrupt_kernel()
113
115
114 def signal_kernel(self, kernel_id, signum):
116 def signal_kernel(self, kernel_id, signum):
115 """ Sends a signal to the kernel by its uuid.
117 """ Sends a signal to the kernel by its uuid.
116
118
117 Note that since only SIGTERM is supported on Windows, this function
119 Note that since only SIGTERM is supported on Windows, this function
118 is only useful on Unix systems.
120 is only useful on Unix systems.
119
121
120 Parameters
122 Parameters
121 ==========
123 ==========
122 kernel_id : uuid
124 kernel_id : uuid
123 The id of the kernel to signal.
125 The id of the kernel to signal.
124 """
126 """
125 return self.get_kernel(kernel_id).signal_kernel(signum)
127 return self.get_kernel(kernel_id).signal_kernel(signum)
126
128
127 def get_kernel(self, kernel_id):
129 def get_kernel(self, kernel_id):
128 """Get the single KernelManager object for a kernel by its uuid.
130 """Get the single KernelManager object for a kernel by its uuid.
129
131
130 Parameters
132 Parameters
131 ==========
133 ==========
132 kernel_id : uuid
134 kernel_id : uuid
133 The id of the kernel.
135 The id of the kernel.
134 """
136 """
135 km = self._kernels.get(kernel_id)
137 km = self._kernels.get(kernel_id)
136 if km is not None:
138 if km is not None:
137 return km
139 return km
138 else:
140 else:
139 raise KeyError("Kernel with id not found: %s" % kernel_id)
141 raise KeyError("Kernel with id not found: %s" % kernel_id)
140
142
141 def get_kernel_ports(self, kernel_id):
143 def get_kernel_ports(self, kernel_id):
142 """Return a dictionary of ports for a kernel.
144 """Return a dictionary of ports for a kernel.
143
145
144 Parameters
146 Parameters
145 ==========
147 ==========
146 kernel_id : uuid
148 kernel_id : uuid
147 The id of the kernel.
149 The id of the kernel.
148
150
149 Returns
151 Returns
150 =======
152 =======
151 port_dict : dict
153 port_dict : dict
152 A dict of key, value pairs where the keys are the names
154 A dict of key, value pairs where the keys are the names
153 (stdin_port,iopub_port,shell_port) and the values are the
155 (stdin_port,iopub_port,shell_port) and the values are the
154 integer port numbers for those channels.
156 integer port numbers for those channels.
155 """
157 """
156 # this will raise a KeyError if not found:
158 # this will raise a KeyError if not found:
157 km = self.get_kernel(kernel_id)
159 km = self.get_kernel(kernel_id)
158 return dict(shell_port=km.shell_port,
160 return dict(shell_port=km.shell_port,
159 iopub_port=km.iopub_port,
161 iopub_port=km.iopub_port,
160 stdin_port=km.stdin_port,
162 stdin_port=km.stdin_port,
161 hb_port=km.hb_port,
163 hb_port=km.hb_port,
162 )
164 )
163
165
164 def get_kernel_ip(self, kernel_id):
166 def get_kernel_ip(self, kernel_id):
165 """Return ip address for a kernel.
167 """Return ip address for a kernel.
166
168
167 Parameters
169 Parameters
168 ==========
170 ==========
169 kernel_id : uuid
171 kernel_id : uuid
170 The id of the kernel.
172 The id of the kernel.
171
173
172 Returns
174 Returns
173 =======
175 =======
174 ip : str
176 ip : str
175 The ip address of the kernel.
177 The ip address of the kernel.
176 """
178 """
177 return self.get_kernel(kernel_id).ip
179 return self.get_kernel(kernel_id).ip
178
180
179 def create_connected_stream(self, ip, port, socket_type):
181 def create_connected_stream(self, ip, port, socket_type):
180 sock = self.context.socket(socket_type)
182 sock = self.context.socket(socket_type)
181 addr = "tcp://%s:%i" % (ip, port)
183 addr = "tcp://%s:%i" % (ip, port)
182 self.log.info("Connecting to: %s" % addr)
184 self.log.info("Connecting to: %s" % addr)
183 sock.connect(addr)
185 sock.connect(addr)
184 return ZMQStream(sock)
186 return ZMQStream(sock)
185
187
186 def create_iopub_stream(self, kernel_id):
188 def create_iopub_stream(self, kernel_id):
187 ip = self.get_kernel_ip(kernel_id)
189 ip = self.get_kernel_ip(kernel_id)
188 ports = self.get_kernel_ports(kernel_id)
190 ports = self.get_kernel_ports(kernel_id)
189 iopub_stream = self.create_connected_stream(ip, ports['iopub_port'], zmq.SUB)
191 iopub_stream = self.create_connected_stream(ip, ports['iopub_port'], zmq.SUB)
190 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
192 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
191 return iopub_stream
193 return iopub_stream
192
194
193 def create_shell_stream(self, kernel_id):
195 def create_shell_stream(self, kernel_id):
194 ip = self.get_kernel_ip(kernel_id)
196 ip = self.get_kernel_ip(kernel_id)
195 ports = self.get_kernel_ports(kernel_id)
197 ports = self.get_kernel_ports(kernel_id)
196 shell_stream = self.create_connected_stream(ip, ports['shell_port'], zmq.DEALER)
198 shell_stream = self.create_connected_stream(ip, ports['shell_port'], zmq.DEALER)
197 return shell_stream
199 return shell_stream
198
200
199 def create_hb_stream(self, kernel_id):
201 def create_hb_stream(self, kernel_id):
200 ip = self.get_kernel_ip(kernel_id)
202 ip = self.get_kernel_ip(kernel_id)
201 ports = self.get_kernel_ports(kernel_id)
203 ports = self.get_kernel_ports(kernel_id)
202 hb_stream = self.create_connected_stream(ip, ports['hb_port'], zmq.REQ)
204 hb_stream = self.create_connected_stream(ip, ports['hb_port'], zmq.REQ)
203 return hb_stream
205 return hb_stream
204
206
205
207
206 class MappingKernelManager(MultiKernelManager):
208 class MappingKernelManager(MultiKernelManager):
207 """A KernelManager that handles notebok mapping and HTTP error handling"""
209 """A KernelManager that handles notebok mapping and HTTP error handling"""
208
210
209 kernel_argv = List(Unicode)
211 kernel_argv = List(Unicode)
210
212
211 time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""")
213 time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""")
212 first_beat = Float(5.0, config=True, help="Delay (in seconds) before sending first heartbeat.")
214 first_beat = Float(5.0, config=True, help="Delay (in seconds) before sending first heartbeat.")
213
215
214 max_msg_size = Integer(65536, config=True, help="""
216 max_msg_size = Integer(65536, config=True, help="""
215 The max raw message size accepted from the browser
217 The max raw message size accepted from the browser
216 over a WebSocket connection.
218 over a WebSocket connection.
217 """)
219 """)
218
220
219 _notebook_mapping = Dict()
221 _notebook_mapping = Dict()
220
222
221 #-------------------------------------------------------------------------
223 #-------------------------------------------------------------------------
222 # Methods for managing kernels and sessions
224 # Methods for managing kernels and sessions
223 #-------------------------------------------------------------------------
225 #-------------------------------------------------------------------------
224
226
225 def kernel_for_notebook(self, notebook_id):
227 def kernel_for_notebook(self, notebook_id):
226 """Return the kernel_id for a notebook_id or None."""
228 """Return the kernel_id for a notebook_id or None."""
227 return self._notebook_mapping.get(notebook_id)
229 return self._notebook_mapping.get(notebook_id)
228
230
229 def set_kernel_for_notebook(self, notebook_id, kernel_id):
231 def set_kernel_for_notebook(self, notebook_id, kernel_id):
230 """Associate a notebook with a kernel."""
232 """Associate a notebook with a kernel."""
231 if notebook_id is not None:
233 if notebook_id is not None:
232 self._notebook_mapping[notebook_id] = kernel_id
234 self._notebook_mapping[notebook_id] = kernel_id
233
235
234 def notebook_for_kernel(self, kernel_id):
236 def notebook_for_kernel(self, kernel_id):
235 """Return the notebook_id for a kernel_id or None."""
237 """Return the notebook_id for a kernel_id or None."""
236 notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
238 notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
237 if len(notebook_ids) == 1:
239 if len(notebook_ids) == 1:
238 return notebook_ids[0]
240 return notebook_ids[0]
239 else:
241 else:
240 return None
242 return None
241
243
242 def delete_mapping_for_kernel(self, kernel_id):
244 def delete_mapping_for_kernel(self, kernel_id):
243 """Remove the kernel/notebook mapping for kernel_id."""
245 """Remove the kernel/notebook mapping for kernel_id."""
244 notebook_id = self.notebook_for_kernel(kernel_id)
246 notebook_id = self.notebook_for_kernel(kernel_id)
245 if notebook_id is not None:
247 if notebook_id is not None:
246 del self._notebook_mapping[notebook_id]
248 del self._notebook_mapping[notebook_id]
247
249
248 def start_kernel(self, notebook_id=None):
250 def start_kernel(self, notebook_id=None):
249 """Start a kernel for a notebok an return its kernel_id.
251 """Start a kernel for a notebok an return its kernel_id.
250
252
251 Parameters
253 Parameters
252 ----------
254 ----------
253 notebook_id : uuid
255 notebook_id : uuid
254 The uuid of the notebook to associate the new kernel with. If this
256 The uuid of the notebook to associate the new kernel with. If this
255 is not None, this kernel will be persistent whenever the notebook
257 is not None, this kernel will be persistent whenever the notebook
256 requests a kernel.
258 requests a kernel.
257 """
259 """
258 kernel_id = self.kernel_for_notebook(notebook_id)
260 kernel_id = self.kernel_for_notebook(notebook_id)
259 if kernel_id is None:
261 if kernel_id is None:
260 kwargs = dict()
262 kwargs = dict()
261 kwargs['extra_arguments'] = self.kernel_argv
263 kwargs['extra_arguments'] = self.kernel_argv
262 kernel_id = super(MappingKernelManager, self).start_kernel(**kwargs)
264 kernel_id = super(MappingKernelManager, self).start_kernel(**kwargs)
263 self.set_kernel_for_notebook(notebook_id, kernel_id)
265 self.set_kernel_for_notebook(notebook_id, kernel_id)
264 self.log.info("Kernel started: %s" % kernel_id)
266 self.log.info("Kernel started: %s" % kernel_id)
265 self.log.debug("Kernel args: %r" % kwargs)
267 self.log.debug("Kernel args: %r" % kwargs)
266 else:
268 else:
267 self.log.info("Using existing kernel: %s" % kernel_id)
269 self.log.info("Using existing kernel: %s" % kernel_id)
268 return kernel_id
270 return kernel_id
269
271
270 def kill_kernel(self, kernel_id):
272 def kill_kernel(self, kernel_id):
271 """Kill a kernel and remove its notebook association."""
273 """Kill a kernel and remove its notebook association."""
272 self._check_kernel_id(kernel_id)
274 self._check_kernel_id(kernel_id)
273 super(MappingKernelManager, self).kill_kernel(kernel_id)
275 super(MappingKernelManager, self).kill_kernel(kernel_id)
274 self.delete_mapping_for_kernel(kernel_id)
276 self.delete_mapping_for_kernel(kernel_id)
275 self.log.info("Kernel killed: %s" % kernel_id)
277 self.log.info("Kernel killed: %s" % kernel_id)
276
278
277 def interrupt_kernel(self, kernel_id):
279 def interrupt_kernel(self, kernel_id):
278 """Interrupt a kernel."""
280 """Interrupt a kernel."""
279 self._check_kernel_id(kernel_id)
281 self._check_kernel_id(kernel_id)
280 super(MappingKernelManager, self).interrupt_kernel(kernel_id)
282 super(MappingKernelManager, self).interrupt_kernel(kernel_id)
281 self.log.info("Kernel interrupted: %s" % kernel_id)
283 self.log.info("Kernel interrupted: %s" % kernel_id)
282
284
283 def restart_kernel(self, kernel_id):
285 def restart_kernel(self, kernel_id):
284 """Restart a kernel while keeping clients connected."""
286 """Restart a kernel while keeping clients connected."""
285 self._check_kernel_id(kernel_id)
287 self._check_kernel_id(kernel_id)
286 km = self.get_kernel(kernel_id)
288 km = self.get_kernel(kernel_id)
287 km.restart_kernel(now=True)
289 km.restart_kernel()
288 self.log.info("Kernel restarted: %s" % kernel_id)
290 self.log.info("Kernel restarted: %s" % kernel_id)
289 return kernel_id
291 return kernel_id
290
292
291 # the following remains, in case the KM restart machinery is
293 # the following remains, in case the KM restart machinery is
292 # somehow unacceptable
294 # somehow unacceptable
293 # Get the notebook_id to preserve the kernel/notebook association.
295 # Get the notebook_id to preserve the kernel/notebook association.
294 notebook_id = self.notebook_for_kernel(kernel_id)
296 notebook_id = self.notebook_for_kernel(kernel_id)
295 # Create the new kernel first so we can move the clients over.
297 # Create the new kernel first so we can move the clients over.
296 new_kernel_id = self.start_kernel()
298 new_kernel_id = self.start_kernel()
297 # Now kill the old kernel.
299 # Now kill the old kernel.
298 self.kill_kernel(kernel_id)
300 self.kill_kernel(kernel_id)
299 # Now save the new kernel/notebook association. We have to save it
301 # Now save the new kernel/notebook association. We have to save it
300 # after the old kernel is killed as that will delete the mapping.
302 # after the old kernel is killed as that will delete the mapping.
301 self.set_kernel_for_notebook(notebook_id, new_kernel_id)
303 self.set_kernel_for_notebook(notebook_id, new_kernel_id)
302 self.log.info("Kernel restarted: %s" % new_kernel_id)
304 self.log.info("Kernel restarted: %s" % new_kernel_id)
303 return new_kernel_id
305 return new_kernel_id
304
306
305 def create_iopub_stream(self, kernel_id):
307 def create_iopub_stream(self, kernel_id):
306 """Create a new iopub stream."""
308 """Create a new iopub stream."""
307 self._check_kernel_id(kernel_id)
309 self._check_kernel_id(kernel_id)
308 return super(MappingKernelManager, self).create_iopub_stream(kernel_id)
310 return super(MappingKernelManager, self).create_iopub_stream(kernel_id)
309
311
310 def create_shell_stream(self, kernel_id):
312 def create_shell_stream(self, kernel_id):
311 """Create a new shell stream."""
313 """Create a new shell stream."""
312 self._check_kernel_id(kernel_id)
314 self._check_kernel_id(kernel_id)
313 return super(MappingKernelManager, self).create_shell_stream(kernel_id)
315 return super(MappingKernelManager, self).create_shell_stream(kernel_id)
314
316
315 def create_hb_stream(self, kernel_id):
317 def create_hb_stream(self, kernel_id):
316 """Create a new hb stream."""
318 """Create a new hb stream."""
317 self._check_kernel_id(kernel_id)
319 self._check_kernel_id(kernel_id)
318 return super(MappingKernelManager, self).create_hb_stream(kernel_id)
320 return super(MappingKernelManager, self).create_hb_stream(kernel_id)
319
321
320 def _check_kernel_id(self, kernel_id):
322 def _check_kernel_id(self, kernel_id):
321 """Check a that a kernel_id exists and raise 404 if not."""
323 """Check a that a kernel_id exists and raise 404 if not."""
322 if kernel_id not in self:
324 if kernel_id not in self:
323 raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id)
325 raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id)
324
326
General Comments 0
You need to be logged in to leave comments. Login now