##// END OF EJS Templates
Fixed subtle bug in kernel restarting....
Brian E. Granger -
Show More
@@ -90,14 +90,12 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
90 90 rkm = self.application.routing_kernel_manager
91 91 self.router = rkm.get_router(kernel_id, self.stream_name)
92 92 self.client_id = self.router.register_client(self)
93 logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
94 93
95 94 def on_message(self, msg):
96 95 self.router.forward_msg(self.client_id, msg)
97 96
98 97 def on_close(self):
99 98 self.router.unregister_client(self.client_id)
100 logging.info("Connection closed: %s" % self.client_id)
101 99
102 100
103 101 #-----------------------------------------------------------------------------
@@ -207,9 +207,20 b' class RoutingKernelManager(LoggingConfigurable):'
207 207
208 208 @property
209 209 def kernel_ids(self):
210 """List the kernel ids."""
210 211 return self.kernel_manager.kernel_ids
211 212
213 def kernel_for_notebook(self, notebook_id):
214 """Return the kernel_id for a notebook_id or None."""
215 return self._notebook_mapping.get(notebook_id)
216
217 def set_kernel_for_notebook(self, notebook_id, kernel_id):
218 """Associate a notebook with a kernel."""
219 if notebook_id is not None:
220 self._notebook_mapping[notebook_id] = kernel_id
221
212 222 def notebook_for_kernel(self, kernel_id):
223 """Return the notebook_id for a kernel_id or None."""
213 224 notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
214 225 if len(notebook_ids) == 1:
215 226 return notebook_ids[0]
@@ -217,20 +228,29 b' class RoutingKernelManager(LoggingConfigurable):'
217 228 return None
218 229
219 230 def delete_mapping_for_kernel(self, kernel_id):
231 """Remove the kernel/notebook mapping for kernel_id."""
220 232 notebook_id = self.notebook_for_kernel(kernel_id)
221 233 if notebook_id is not None:
222 234 del self._notebook_mapping[notebook_id]
223 235
224 236 def start_kernel(self, notebook_id=None):
237 """Start a kernel an return its kernel_id.
238
239 Parameters
240 ----------
241 notebook_id : uuid
242 The uuid of the notebook to associate the new kernel with. If this
243 is not None, this kernel will be persistent whenever the notebook
244 requests a kernel.
245 """
225 246 self.log.info
226 kernel_id = self._notebook_mapping.get(notebook_id)
247 kernel_id = self.kernel_for_notebook(notebook_id)
227 248 if kernel_id is None:
228 249 kwargs = dict()
229 250 kwargs['extra_arguments'] = self.kernel_argv
230 251 kernel_id = self.kernel_manager.start_kernel(**kwargs)
231 if notebook_id is not None:
232 self._notebook_mapping[notebook_id] = kernel_id
233 self.log.info("Kernel started for notebook %s: %s" % (notebook_id,kernel_id))
252 self.set_kernel_for_notebook(notebook_id, kernel_id)
253 self.log.info("Kernel started: %s" % kernel_id)
234 254 self.log.debug("Kernel args: %r" % kwargs)
235 255 self.start_session_manager(kernel_id)
236 256 else:
@@ -238,6 +258,7 b' class RoutingKernelManager(LoggingConfigurable):'
238 258 return kernel_id
239 259
240 260 def start_session_manager(self, kernel_id):
261 """Start the ZMQ sockets (a "session") to connect to a kernel."""
241 262 sm = self.kernel_manager.create_session_manager(kernel_id)
242 263 self._session_dict[kernel_id] = sm
243 264 iopub_stream = sm.get_iopub_stream()
@@ -248,10 +269,11 b' class RoutingKernelManager(LoggingConfigurable):'
248 269 shell_router = ShellStreamRouter(
249 270 zmq_stream=shell_stream, session=sm.session, config=self.config
250 271 )
251 self._routers[(kernel_id, 'iopub')] = iopub_router
252 self._routers[(kernel_id, 'shell')] = shell_router
272 self.set_router(kernel_id, 'iopub', iopub_router)
273 self.set_router(kernel_id, 'shell', shell_router)
253 274
254 275 def kill_kernel(self, kernel_id):
276 """Kill a kernel and remove its notebook association."""
255 277 if kernel_id not in self.kernel_manager:
256 278 raise web.HTTPError(404)
257 279 try:
@@ -264,12 +286,14 b' class RoutingKernelManager(LoggingConfigurable):'
264 286 self.log.info("Kernel killed: %s" % kernel_id)
265 287
266 288 def interrupt_kernel(self, kernel_id):
289 """Interrupt a kernel."""
267 290 if kernel_id not in self.kernel_manager:
268 291 raise web.HTTPError(404)
269 292 self.kernel_manager.interrupt_kernel(kernel_id)
270 293 self.log.debug("Kernel interrupted: %s" % kernel_id)
271 294
272 295 def restart_kernel(self, kernel_id):
296 """Restart a kernel while keeping clients connected."""
273 297 if kernel_id not in self.kernel_manager:
274 298 raise web.HTTPError(404)
275 299
@@ -286,6 +310,14 b' class RoutingKernelManager(LoggingConfigurable):'
286 310 new_iopub_router.copy_clients(old_iopub_router)
287 311 new_shell_router.copy_clients(old_shell_router)
288 312
313 # Shut down the old routers
314 old_shell_router.close()
315 old_iopub_router.close()
316 self.delete_router(kernel_id, 'shell')
317 self.delete_router(kernel_id, 'iopub')
318 del old_shell_router
319 del old_iopub_router
320
289 321 # Now shutdown the old session and the kernel.
290 322 # TODO: This causes a hard crash in ZMQStream.close, which sets
291 323 # self.socket to None to hastily. We will need to fix this in PyZMQ
@@ -295,12 +327,24 b' class RoutingKernelManager(LoggingConfigurable):'
295 327
296 328 # Now save the new kernel/notebook association. We have to save it
297 329 # after the old kernel is killed as that will delete the mapping.
298 self._notebook_mapping[notebook_id] = kernel_id
330 self.set_kernel_for_notebook(notebook_id, new_kernel_id)
299 331
300 self.log.debug("Kernel restarted: %s -> %s" % (kernel_id, new_kernel_id))
332 self.log.debug("Kernel restarted: %s" % new_kernel_id)
301 333 return new_kernel_id
302 334
303 335 def get_router(self, kernel_id, stream_name):
336 """Return the router for a given kernel_id and stream name."""
304 337 router = self._routers[(kernel_id, stream_name)]
305 338 return router
306 339
340 def set_router(self, kernel_id, stream_name, router):
341 """Set the router for a given kernel_id and stream_name."""
342 self._routers[(kernel_id, stream_name)] = router
343
344 def delete_router(self, kernel_id, stream_name):
345 """Delete a router for a kernel_id and stream_name."""
346 try:
347 del self._routers[(kernel_id, stream_name)]
348 except KeyError:
349 pass
350
@@ -37,9 +37,17 b' class ZMQStreamRouter(Configurable):'
37 37 super(ZMQStreamRouter,self).__init__(**kwargs)
38 38 self.zmq_stream.on_recv(self._on_zmq_reply)
39 39
40 def __del__(self):
41 self.close()
42
43 def close(self):
44 """Disable the routing actions of this router."""
45 self._clients = {}
46 self.zmq_stream.on_recv(None)
47
40 48 def register_client(self, client):
41 49 """Register a client, returning a client uuid."""
42 client_id = uuid.uuid4()
50 client_id = unicode(uuid.uuid4())
43 51 self._clients[client_id] = client
44 52 return client_id
45 53
@@ -112,7 +120,6 b' class ShellStreamRouter(ZMQStreamRouter):'
112 120 def forward_msg(self, client_id, msg):
113 121 if len(msg) < self.max_msg_size:
114 122 msg = json.loads(msg)
115 # to_send = self.session.serialize(msg)
116 123 self._request_queue.put(client_id)
117 124 self.session.send(self.zmq_stream, msg)
118 125
General Comments 0
You need to be logged in to leave comments. Login now