##// END OF EJS Templates
fix KeyError creating ZMQStreams in notebook...
MinRK -
Show More
@@ -1,362 +1,364 b''
1 1 """A kernel manager for multiple kernels.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import os
20 20 import uuid
21 21
22 22 import zmq
23 23 from zmq.eventloop.zmqstream import ZMQStream
24 24
25 25 from tornado import web
26 26
27 27 from IPython.config.configurable import LoggingConfigurable
28 28 from IPython.utils.importstring import import_item
29 29 from IPython.utils.traitlets import (
30 30 Instance, Dict, List, Unicode, Float, Integer, Any, DottedObjectName,
31 31 )
32 32 #-----------------------------------------------------------------------------
33 33 # Classes
34 34 #-----------------------------------------------------------------------------
35 35
36 36 class DuplicateKernelError(Exception):
37 37 pass
38 38
39 39
40 40 class MultiKernelManager(LoggingConfigurable):
41 41 """A class for managing multiple kernels."""
42 42
43 43 kernel_manager_class = DottedObjectName(
44 44 "IPython.zmq.blockingkernelmanager.BlockingKernelManager", config=True,
45 45 help="""The kernel manager class. This is configurable to allow
46 46 subclassing of the KernelManager for customized behavior.
47 47 """
48 48 )
49 49 def _kernel_manager_class_changed(self, name, old, new):
50 50 self.kernel_manager_factory = import_item(new)
51 51
52 52 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
53 53 def _kernel_manager_factory_default(self):
54 54 return import_item(self.kernel_manager_class)
55 55
56 56 context = Instance('zmq.Context')
57 57 def _context_default(self):
58 58 return zmq.Context.instance()
59 59
60 60 connection_dir = Unicode('')
61 61
62 62 _kernels = Dict()
63 63
64 64 def list_kernel_ids(self):
65 65 """Return a list of the kernel ids of the active kernels."""
66 66 # Create a copy so we can iterate over kernels in operations
67 67 # that delete keys.
68 68 return list(self._kernels.keys())
69 69
70 70 def __len__(self):
71 71 """Return the number of running kernels."""
72 72 return len(self.list_kernel_ids())
73 73
74 74 def __contains__(self, kernel_id):
75 75 return kernel_id in self._kernels
76 76
77 77 def start_kernel(self, **kwargs):
78 78 """Start a new kernel.
79 79
80 80 The caller can pick a kernel_id by passing one in as a keyword arg,
81 81 otherwise one will be picked using a uuid.
82 82
83 83 To silence the kernel's stdout/stderr, call this using::
84 84
85 85 km.start_kernel(stdout=PIPE, stderr=PIPE)
86 86
87 87 """
88 88 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
89 89 if kernel_id in self:
90 90 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
91 91 # kernel_manager_factory is the constructor for the KernelManager
92 92 # subclass we are using. It can be configured as any Configurable,
93 93 # including things like its transport and ip.
94 94 km = self.kernel_manager_factory(connection_file=os.path.join(
95 95 self.connection_dir, "kernel-%s.json" % kernel_id),
96 96 config=self.config,
97 97 )
98 98 km.start_kernel(**kwargs)
99 99 # start just the shell channel, needed for graceful restart
100 100 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
101 101 self._kernels[kernel_id] = km
102 102 return kernel_id
103 103
104 104 def shutdown_kernel(self, kernel_id, now=False):
105 105 """Shutdown a kernel by its kernel uuid.
106 106
107 107 Parameters
108 108 ==========
109 109 kernel_id : uuid
110 110 The id of the kernel to shutdown.
111 111 now : bool
112 112 Should the kernel be shutdown forcibly using a signal.
113 113 """
114 114 k = self.get_kernel(kernel_id)
115 115 k.shutdown_kernel(now=now)
116 116 k.shell_channel.stop()
117 117 del self._kernels[kernel_id]
118 118
119 119 def shutdown_all(self, now=False):
120 120 """Shutdown all kernels."""
121 121 for kid in self.list_kernel_ids():
122 122 self.shutdown_kernel(kid, now=now)
123 123
124 124 def interrupt_kernel(self, kernel_id):
125 125 """Interrupt (SIGINT) the kernel by its uuid.
126 126
127 127 Parameters
128 128 ==========
129 129 kernel_id : uuid
130 130 The id of the kernel to interrupt.
131 131 """
132 132 return self.get_kernel(kernel_id).interrupt_kernel()
133 133
134 134 def signal_kernel(self, kernel_id, signum):
135 135 """Sends a signal to the kernel by its uuid.
136 136
137 137 Note that since only SIGTERM is supported on Windows, this function
138 138 is only useful on Unix systems.
139 139
140 140 Parameters
141 141 ==========
142 142 kernel_id : uuid
143 143 The id of the kernel to signal.
144 144 """
145 145 return self.get_kernel(kernel_id).signal_kernel(signum)
146 146
147 147 def restart_kernel(self, kernel_id):
148 148 """Restart a kernel by its uuid, keeping the same ports.
149 149
150 150 Parameters
151 151 ==========
152 152 kernel_id : uuid
153 153 The id of the kernel to interrupt.
154 154 """
155 155 return self.get_kernel(kernel_id).restart_kernel()
156 156
157 157 def get_kernel(self, kernel_id):
158 158 """Get the single KernelManager object for a kernel by its uuid.
159 159
160 160 Parameters
161 161 ==========
162 162 kernel_id : uuid
163 163 The id of the kernel.
164 164 """
165 165 km = self._kernels.get(kernel_id)
166 166 if km is not None:
167 167 return km
168 168 else:
169 169 raise KeyError("Kernel with id not found: %s" % kernel_id)
170 170
171 171 def get_connection_info(self, kernel_id):
172 172 """Return a dictionary of connection data for a kernel.
173 173
174 174 Parameters
175 175 ==========
176 176 kernel_id : uuid
177 177 The id of the kernel.
178 178
179 179 Returns
180 180 =======
181 181 connection_dict : dict
182 182 A dict of the information needed to connect to a kernel.
183 183 This includes the ip address and the integer port
184 184 numbers of the different channels (stdin_port, iopub_port,
185 185 shell_port, hb_port).
186 186 """
187 187 km = self.get_kernel(kernel_id)
188 188 return dict(transport=km.transport,
189 189 ip=km.ip,
190 190 shell_port=km.shell_port,
191 191 iopub_port=km.iopub_port,
192 192 stdin_port=km.stdin_port,
193 193 hb_port=km.hb_port,
194 194 )
195 195
196 196 def _make_url(self, transport, ip, port):
197 197 """Make a ZeroMQ URL for a given transport, ip and port."""
198 198 if transport == 'tcp':
199 199 return "tcp://%s:%i" % (ip, port)
200 200 else:
201 201 return "%s://%s-%s" % (transport, ip, port)
202 202
203 def _create_connected_stream(self, kernel_id, socket_type):
203 def _create_connected_stream(self, kernel_id, socket_type, channel):
204 204 """Create a connected ZMQStream for a kernel."""
205 205 cinfo = self.get_connection_info(kernel_id)
206 url = self._make_url(cinfo['transport'], cinfo['ip'], cinfo['port'])
206 url = self._make_url(cinfo['transport'], cinfo['ip'],
207 cinfo['%s_port' % channel]
208 )
207 209 sock = self.context.socket(socket_type)
208 210 self.log.info("Connecting to: %s" % url)
209 211 sock.connect(url)
210 212 return ZMQStream(sock)
211 213
212 214 def create_iopub_stream(self, kernel_id):
213 215 """Return a ZMQStream object connected to the iopub channel.
214 216
215 217 Parameters
216 218 ==========
217 219 kernel_id : uuid
218 220 The id of the kernel.
219 221
220 222 Returns
221 223 =======
222 224 stream : ZMQStream
223 225 """
224 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB)
226 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB, 'iopub')
225 227 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
226 228 return iopub_stream
227 229
228 230 def create_shell_stream(self, kernel_id):
229 231 """Return a ZMQStream object connected to the shell channel.
230 232
231 233 Parameters
232 234 ==========
233 235 kernel_id : uuid
234 236 The id of the kernel.
235 237
236 238 Returns
237 239 =======
238 240 stream : ZMQStream
239 241 """
240 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER)
242 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER, 'shell')
241 243 return shell_stream
242 244
243 245 def create_hb_stream(self, kernel_id):
244 246 """Return a ZMQStream object connected to the hb channel.
245 247
246 248 Parameters
247 249 ==========
248 250 kernel_id : uuid
249 251 The id of the kernel.
250 252
251 253 Returns
252 254 =======
253 255 stream : ZMQStream
254 256 """
255 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ)
257 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ, 'hb')
256 258 return hb_stream
257 259
258 260
259 261 class MappingKernelManager(MultiKernelManager):
260 262 """A KernelManager that handles notebok mapping and HTTP error handling"""
261 263
262 264 kernel_argv = List(Unicode)
263 265
264 266 time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""")
265 267 first_beat = Float(5.0, config=True, help="Delay (in seconds) before sending first heartbeat.")
266 268
267 269 max_msg_size = Integer(65536, config=True, help="""
268 270 The max raw message size accepted from the browser
269 271 over a WebSocket connection.
270 272 """)
271 273
272 274 _notebook_mapping = Dict()
273 275
274 276 #-------------------------------------------------------------------------
275 277 # Methods for managing kernels and sessions
276 278 #-------------------------------------------------------------------------
277 279
278 280 def kernel_for_notebook(self, notebook_id):
279 281 """Return the kernel_id for a notebook_id or None."""
280 282 return self._notebook_mapping.get(notebook_id)
281 283
282 284 def set_kernel_for_notebook(self, notebook_id, kernel_id):
283 285 """Associate a notebook with a kernel."""
284 286 if notebook_id is not None:
285 287 self._notebook_mapping[notebook_id] = kernel_id
286 288
287 289 def notebook_for_kernel(self, kernel_id):
288 290 """Return the notebook_id for a kernel_id or None."""
289 291 notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
290 292 if len(notebook_ids) == 1:
291 293 return notebook_ids[0]
292 294 else:
293 295 return None
294 296
295 297 def delete_mapping_for_kernel(self, kernel_id):
296 298 """Remove the kernel/notebook mapping for kernel_id."""
297 299 notebook_id = self.notebook_for_kernel(kernel_id)
298 300 if notebook_id is not None:
299 301 del self._notebook_mapping[notebook_id]
300 302
301 303 def start_kernel(self, notebook_id=None, **kwargs):
302 304 """Start a kernel for a notebok an return its kernel_id.
303 305
304 306 Parameters
305 307 ----------
306 308 notebook_id : uuid
307 309 The uuid of the notebook to associate the new kernel with. If this
308 310 is not None, this kernel will be persistent whenever the notebook
309 311 requests a kernel.
310 312 """
311 313 kernel_id = self.kernel_for_notebook(notebook_id)
312 314 if kernel_id is None:
313 315 kwargs['extra_arguments'] = self.kernel_argv
314 316 kernel_id = super(MappingKernelManager, self).start_kernel(**kwargs)
315 317 self.set_kernel_for_notebook(notebook_id, kernel_id)
316 318 self.log.info("Kernel started: %s" % kernel_id)
317 319 self.log.debug("Kernel args: %r" % kwargs)
318 320 else:
319 321 self.log.info("Using existing kernel: %s" % kernel_id)
320 322 return kernel_id
321 323
322 324 def shutdown_kernel(self, kernel_id, now=False):
323 325 """Shutdown a kernel and remove its notebook association."""
324 326 self._check_kernel_id(kernel_id)
325 327 super(MappingKernelManager, self).shutdown_kernel(
326 328 kernel_id, now=now
327 329 )
328 330 self.delete_mapping_for_kernel(kernel_id)
329 331 self.log.info("Kernel shutdown: %s" % kernel_id)
330 332
331 333 def interrupt_kernel(self, kernel_id):
332 334 """Interrupt a kernel."""
333 335 self._check_kernel_id(kernel_id)
334 336 super(MappingKernelManager, self).interrupt_kernel(kernel_id)
335 337 self.log.info("Kernel interrupted: %s" % kernel_id)
336 338
337 339 def restart_kernel(self, kernel_id):
338 340 """Restart a kernel while keeping clients connected."""
339 341 self._check_kernel_id(kernel_id)
340 342 super(MappingKernelManager, self).restart_kernel(kernel_id)
341 343 self.log.info("Kernel restarted: %s" % kernel_id)
342 344
343 345 def create_iopub_stream(self, kernel_id):
344 346 """Create a new iopub stream."""
345 347 self._check_kernel_id(kernel_id)
346 348 return super(MappingKernelManager, self).create_iopub_stream(kernel_id)
347 349
348 350 def create_shell_stream(self, kernel_id):
349 351 """Create a new shell stream."""
350 352 self._check_kernel_id(kernel_id)
351 353 return super(MappingKernelManager, self).create_shell_stream(kernel_id)
352 354
353 355 def create_hb_stream(self, kernel_id):
354 356 """Create a new hb stream."""
355 357 self._check_kernel_id(kernel_id)
356 358 return super(MappingKernelManager, self).create_hb_stream(kernel_id)
357 359
358 360 def _check_kernel_id(self, kernel_id):
359 361 """Check a that a kernel_id exists and raise 404 if not."""
360 362 if kernel_id not in self:
361 363 raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id)
362 364
General Comments 0
You need to be logged in to leave comments. Login now