##// END OF EJS Templates
Fixed subtle bug in kernel restarting....
Brian E. Granger -
Show More
@@ -90,14 +90,12 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
90 rkm = self.application.routing_kernel_manager
90 rkm = self.application.routing_kernel_manager
91 self.router = rkm.get_router(kernel_id, self.stream_name)
91 self.router = rkm.get_router(kernel_id, self.stream_name)
92 self.client_id = self.router.register_client(self)
92 self.client_id = self.router.register_client(self)
93 logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
94
93
95 def on_message(self, msg):
94 def on_message(self, msg):
96 self.router.forward_msg(self.client_id, msg)
95 self.router.forward_msg(self.client_id, msg)
97
96
98 def on_close(self):
97 def on_close(self):
99 self.router.unregister_client(self.client_id)
98 self.router.unregister_client(self.client_id)
100 logging.info("Connection closed: %s" % self.client_id)
101
99
102
100
103 #-----------------------------------------------------------------------------
101 #-----------------------------------------------------------------------------
@@ -207,9 +207,20 b' class RoutingKernelManager(LoggingConfigurable):'
207
207
208 @property
208 @property
209 def kernel_ids(self):
209 def kernel_ids(self):
210 """List the kernel ids."""
210 return self.kernel_manager.kernel_ids
211 return self.kernel_manager.kernel_ids
211
212
213 def kernel_for_notebook(self, notebook_id):
214 """Return the kernel_id for a notebook_id or None."""
215 return self._notebook_mapping.get(notebook_id)
216
217 def set_kernel_for_notebook(self, notebook_id, kernel_id):
218 """Associate a notebook with a kernel."""
219 if notebook_id is not None:
220 self._notebook_mapping[notebook_id] = kernel_id
221
212 def notebook_for_kernel(self, kernel_id):
222 def notebook_for_kernel(self, kernel_id):
223 """Return the notebook_id for a kernel_id or None."""
213 notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
224 notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
214 if len(notebook_ids) == 1:
225 if len(notebook_ids) == 1:
215 return notebook_ids[0]
226 return notebook_ids[0]
@@ -217,20 +228,29 b' class RoutingKernelManager(LoggingConfigurable):'
217 return None
228 return None
218
229
219 def delete_mapping_for_kernel(self, kernel_id):
230 def delete_mapping_for_kernel(self, kernel_id):
231 """Remove the kernel/notebook mapping for kernel_id."""
220 notebook_id = self.notebook_for_kernel(kernel_id)
232 notebook_id = self.notebook_for_kernel(kernel_id)
221 if notebook_id is not None:
233 if notebook_id is not None:
222 del self._notebook_mapping[notebook_id]
234 del self._notebook_mapping[notebook_id]
223
235
224 def start_kernel(self, notebook_id=None):
236 def start_kernel(self, notebook_id=None):
237 """Start a kernel an return its kernel_id.
238
239 Parameters
240 ----------
241 notebook_id : uuid
242 The uuid of the notebook to associate the new kernel with. If this
243 is not None, this kernel will be persistent whenever the notebook
244 requests a kernel.
245 """
225 self.log.info
246 self.log.info
226 kernel_id = self._notebook_mapping.get(notebook_id)
247 kernel_id = self.kernel_for_notebook(notebook_id)
227 if kernel_id is None:
248 if kernel_id is None:
228 kwargs = dict()
249 kwargs = dict()
229 kwargs['extra_arguments'] = self.kernel_argv
250 kwargs['extra_arguments'] = self.kernel_argv
230 kernel_id = self.kernel_manager.start_kernel(**kwargs)
251 kernel_id = self.kernel_manager.start_kernel(**kwargs)
231 if notebook_id is not None:
252 self.set_kernel_for_notebook(notebook_id, kernel_id)
232 self._notebook_mapping[notebook_id] = kernel_id
253 self.log.info("Kernel started: %s" % kernel_id)
233 self.log.info("Kernel started for notebook %s: %s" % (notebook_id,kernel_id))
234 self.log.debug("Kernel args: %r" % kwargs)
254 self.log.debug("Kernel args: %r" % kwargs)
235 self.start_session_manager(kernel_id)
255 self.start_session_manager(kernel_id)
236 else:
256 else:
@@ -238,6 +258,7 b' class RoutingKernelManager(LoggingConfigurable):'
238 return kernel_id
258 return kernel_id
239
259
240 def start_session_manager(self, kernel_id):
260 def start_session_manager(self, kernel_id):
261 """Start the ZMQ sockets (a "session") to connect to a kernel."""
241 sm = self.kernel_manager.create_session_manager(kernel_id)
262 sm = self.kernel_manager.create_session_manager(kernel_id)
242 self._session_dict[kernel_id] = sm
263 self._session_dict[kernel_id] = sm
243 iopub_stream = sm.get_iopub_stream()
264 iopub_stream = sm.get_iopub_stream()
@@ -248,10 +269,11 b' class RoutingKernelManager(LoggingConfigurable):'
248 shell_router = ShellStreamRouter(
269 shell_router = ShellStreamRouter(
249 zmq_stream=shell_stream, session=sm.session, config=self.config
270 zmq_stream=shell_stream, session=sm.session, config=self.config
250 )
271 )
251 self._routers[(kernel_id, 'iopub')] = iopub_router
272 self.set_router(kernel_id, 'iopub', iopub_router)
252 self._routers[(kernel_id, 'shell')] = shell_router
273 self.set_router(kernel_id, 'shell', shell_router)
253
274
254 def kill_kernel(self, kernel_id):
275 def kill_kernel(self, kernel_id):
276 """Kill a kernel and remove its notebook association."""
255 if kernel_id not in self.kernel_manager:
277 if kernel_id not in self.kernel_manager:
256 raise web.HTTPError(404)
278 raise web.HTTPError(404)
257 try:
279 try:
@@ -264,12 +286,14 b' class RoutingKernelManager(LoggingConfigurable):'
264 self.log.info("Kernel killed: %s" % kernel_id)
286 self.log.info("Kernel killed: %s" % kernel_id)
265
287
266 def interrupt_kernel(self, kernel_id):
288 def interrupt_kernel(self, kernel_id):
289 """Interrupt a kernel."""
267 if kernel_id not in self.kernel_manager:
290 if kernel_id not in self.kernel_manager:
268 raise web.HTTPError(404)
291 raise web.HTTPError(404)
269 self.kernel_manager.interrupt_kernel(kernel_id)
292 self.kernel_manager.interrupt_kernel(kernel_id)
270 self.log.debug("Kernel interrupted: %s" % kernel_id)
293 self.log.debug("Kernel interrupted: %s" % kernel_id)
271
294
272 def restart_kernel(self, kernel_id):
295 def restart_kernel(self, kernel_id):
296 """Restart a kernel while keeping clients connected."""
273 if kernel_id not in self.kernel_manager:
297 if kernel_id not in self.kernel_manager:
274 raise web.HTTPError(404)
298 raise web.HTTPError(404)
275
299
@@ -286,6 +310,14 b' class RoutingKernelManager(LoggingConfigurable):'
286 new_iopub_router.copy_clients(old_iopub_router)
310 new_iopub_router.copy_clients(old_iopub_router)
287 new_shell_router.copy_clients(old_shell_router)
311 new_shell_router.copy_clients(old_shell_router)
288
312
313 # Shut down the old routers
314 old_shell_router.close()
315 old_iopub_router.close()
316 self.delete_router(kernel_id, 'shell')
317 self.delete_router(kernel_id, 'iopub')
318 del old_shell_router
319 del old_iopub_router
320
289 # Now shutdown the old session and the kernel.
321 # Now shutdown the old session and the kernel.
290 # TODO: This causes a hard crash in ZMQStream.close, which sets
322 # TODO: This causes a hard crash in ZMQStream.close, which sets
291 # self.socket to None to hastily. We will need to fix this in PyZMQ
323 # self.socket to None to hastily. We will need to fix this in PyZMQ
@@ -295,12 +327,24 b' class RoutingKernelManager(LoggingConfigurable):'
295
327
296 # Now save the new kernel/notebook association. We have to save it
328 # Now save the new kernel/notebook association. We have to save it
297 # after the old kernel is killed as that will delete the mapping.
329 # after the old kernel is killed as that will delete the mapping.
298 self._notebook_mapping[notebook_id] = kernel_id
330 self.set_kernel_for_notebook(notebook_id, new_kernel_id)
299
331
300 self.log.debug("Kernel restarted: %s -> %s" % (kernel_id, new_kernel_id))
332 self.log.debug("Kernel restarted: %s" % new_kernel_id)
301 return new_kernel_id
333 return new_kernel_id
302
334
303 def get_router(self, kernel_id, stream_name):
335 def get_router(self, kernel_id, stream_name):
336 """Return the router for a given kernel_id and stream name."""
304 router = self._routers[(kernel_id, stream_name)]
337 router = self._routers[(kernel_id, stream_name)]
305 return router
338 return router
306
339
340 def set_router(self, kernel_id, stream_name, router):
341 """Set the router for a given kernel_id and stream_name."""
342 self._routers[(kernel_id, stream_name)] = router
343
344 def delete_router(self, kernel_id, stream_name):
345 """Delete a router for a kernel_id and stream_name."""
346 try:
347 del self._routers[(kernel_id, stream_name)]
348 except KeyError:
349 pass
350
@@ -37,9 +37,17 b' class ZMQStreamRouter(Configurable):'
37 super(ZMQStreamRouter,self).__init__(**kwargs)
37 super(ZMQStreamRouter,self).__init__(**kwargs)
38 self.zmq_stream.on_recv(self._on_zmq_reply)
38 self.zmq_stream.on_recv(self._on_zmq_reply)
39
39
40 def __del__(self):
41 self.close()
42
43 def close(self):
44 """Disable the routing actions of this router."""
45 self._clients = {}
46 self.zmq_stream.on_recv(None)
47
40 def register_client(self, client):
48 def register_client(self, client):
41 """Register a client, returning a client uuid."""
49 """Register a client, returning a client uuid."""
42 client_id = uuid.uuid4()
50 client_id = unicode(uuid.uuid4())
43 self._clients[client_id] = client
51 self._clients[client_id] = client
44 return client_id
52 return client_id
45
53
@@ -112,7 +120,6 b' class ShellStreamRouter(ZMQStreamRouter):'
112 def forward_msg(self, client_id, msg):
120 def forward_msg(self, client_id, msg):
113 if len(msg) < self.max_msg_size:
121 if len(msg) < self.max_msg_size:
114 msg = json.loads(msg)
122 msg = json.loads(msg)
115 # to_send = self.session.serialize(msg)
116 self._request_queue.put(client_id)
123 self._request_queue.put(client_id)
117 self.session.send(self.zmq_stream, msg)
124 self.session.send(self.zmq_stream, msg)
118
125
General Comments 0
You need to be logged in to leave comments. Login now