##// END OF EJS Templates
Fixed subtle bug in kernel restarting....
Brian E. Granger -
Show More
@@ -1,160 +1,158 b''
1 1 """Tornado handlers for the notebook."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Imports
5 5 #-----------------------------------------------------------------------------
6 6
7 7 import json
8 8 import logging
9 9 import urllib
10 10
11 11 from tornado import web
12 12 from tornado import websocket
13 13
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Top-level handlers
17 17 #-----------------------------------------------------------------------------
18 18
19 19
20 20 class NBBrowserHandler(web.RequestHandler):
21 21 def get(self):
22 22 nbm = self.application.notebook_manager
23 23 project = nbm.notebook_dir
24 24 self.render('nbbrowser.html', project=project)
25 25
26 26
27 27 class NewHandler(web.RequestHandler):
28 28 def get(self):
29 29 notebook_id = self.application.notebook_manager.new_notebook()
30 30 self.render('notebook.html', notebook_id=notebook_id)
31 31
32 32
33 33 class NamedNotebookHandler(web.RequestHandler):
34 34 def get(self, notebook_id):
35 35 nbm = self.application.notebook_manager
36 36 if not nbm.notebook_exists(notebook_id):
37 37 raise web.HTTPError(404)
38 38 self.render('notebook.html', notebook_id=notebook_id)
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Kernel handlers
43 43 #-----------------------------------------------------------------------------
44 44
45 45
46 46 class MainKernelHandler(web.RequestHandler):
47 47
48 48 def get(self):
49 49 rkm = self.application.routing_kernel_manager
50 50 self.finish(json.dumps(rkm.kernel_ids))
51 51
52 52 def post(self):
53 53 rkm = self.application.routing_kernel_manager
54 54 notebook_id = self.get_argument('notebook', default=None)
55 55 kernel_id = rkm.start_kernel(notebook_id)
56 56 self.set_header('Location', '/'+kernel_id)
57 57 self.finish(json.dumps(kernel_id))
58 58
59 59
60 60 class KernelHandler(web.RequestHandler):
61 61
62 62 SUPPORTED_METHODS = ('DELETE')
63 63
64 64 def delete(self, kernel_id):
65 65 rkm = self.application.routing_kernel_manager
66 66 self.kill_kernel(kernel_id)
67 67 self.set_status(204)
68 68 self.finish()
69 69
70 70
71 71 class KernelActionHandler(web.RequestHandler):
72 72
73 73 def post(self, kernel_id, action):
74 74 rkm = self.application.routing_kernel_manager
75 75 if action == 'interrupt':
76 76 rkm.interrupt_kernel(kernel_id)
77 77 self.set_status(204)
78 78 if action == 'restart':
79 79 new_kernel_id = rkm.restart_kernel(kernel_id)
80 80 self.write(json.dumps(new_kernel_id))
81 81 self.finish()
82 82
83 83
84 84 class ZMQStreamHandler(websocket.WebSocketHandler):
85 85
86 86 def initialize(self, stream_name):
87 87 self.stream_name = stream_name
88 88
89 89 def open(self, kernel_id):
90 90 rkm = self.application.routing_kernel_manager
91 91 self.router = rkm.get_router(kernel_id, self.stream_name)
92 92 self.client_id = self.router.register_client(self)
93 logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
94 93
95 94 def on_message(self, msg):
96 95 self.router.forward_msg(self.client_id, msg)
97 96
98 97 def on_close(self):
99 98 self.router.unregister_client(self.client_id)
100 logging.info("Connection closed: %s" % self.client_id)
101 99
102 100
103 101 #-----------------------------------------------------------------------------
104 102 # Notebook web service handlers
105 103 #-----------------------------------------------------------------------------
106 104
107 105 class NotebookRootHandler(web.RequestHandler):
108 106
109 107 def get(self):
110 108 nbm = self.application.notebook_manager
111 109 files = nbm.list_notebooks()
112 110 self.finish(json.dumps(files))
113 111
114 112 def post(self):
115 113 nbm = self.application.notebook_manager
116 114 body = self.request.body.strip()
117 115 format = self.get_argument('format', default='json')
118 116 name = self.get_argument('name', default=None)
119 117 if body:
120 118 notebook_id = nbm.save_new_notebook(body, name=name, format=format)
121 119 else:
122 120 notebook_id = nbm.new_notebook()
123 121 self.set_header('Location', '/'+notebook_id)
124 122 self.finish(json.dumps(notebook_id))
125 123
126 124
127 125 class NotebookHandler(web.RequestHandler):
128 126
129 127 SUPPORTED_METHODS = ('GET', 'PUT', 'DELETE')
130 128
131 129 def get(self, notebook_id):
132 130 nbm = self.application.notebook_manager
133 131 format = self.get_argument('format', default='json')
134 132 last_mod, name, data = nbm.get_notebook(notebook_id, format)
135 133 if format == u'json':
136 134 self.set_header('Content-Type', 'application/json')
137 135 self.set_header('Content-Disposition','attachment; filename=%s.json' % name)
138 136 elif format == u'xml':
139 137 self.set_header('Content-Type', 'application/xml')
140 138 self.set_header('Content-Disposition','attachment; filename=%s.ipynb' % name)
141 139 elif format == u'py':
142 140 self.set_header('Content-Type', 'application/x-python')
143 141 self.set_header('Content-Disposition','attachment; filename=%s.py' % name)
144 142 self.set_header('Last-Modified', last_mod)
145 143 self.finish(data)
146 144
147 145 def put(self, notebook_id):
148 146 nbm = self.application.notebook_manager
149 147 format = self.get_argument('format', default='json')
150 148 name = self.get_argument('name', default=None)
151 149 nbm.save_notebook(notebook_id, self.request.body, name=name, format=format)
152 150 self.set_status(204)
153 151 self.finish()
154 152
155 153 def delete(self, notebook_id):
156 154 nbm = self.application.notebook_manager
157 155 nbm.delete_notebook(notebook_id)
158 156 self.set_status(204)
159 157 self.finish()
160 158
@@ -1,306 +1,350 b''
1 1 """A kernel manager for multiple kernels."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING.txt, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13
14 14 import signal
15 15 import sys
16 16 import uuid
17 17
18 18 import zmq
19 19
20 20 from tornado import web
21 21
22 22 from .routers import IOPubStreamRouter, ShellStreamRouter
23 23
24 24 from IPython.config.configurable import LoggingConfigurable
25 25 from IPython.zmq.ipkernel import launch_kernel
26 26 from IPython.utils.traitlets import Instance, Dict, List, Unicode
27 27
28 28 #-----------------------------------------------------------------------------
29 29 # Classes
30 30 #-----------------------------------------------------------------------------
31 31
32 32 class DuplicateKernelError(Exception):
33 33 pass
34 34
35 35
36 36 class KernelManager(LoggingConfigurable):
37 37 """A class for managing multiple kernels."""
38 38
39 39 context = Instance('zmq.Context')
40 40 def _context_default(self):
41 41 return zmq.Context.instance()
42 42
43 43 _kernels = Dict()
44 44
45 45 @property
46 46 def kernel_ids(self):
47 47 """Return a list of the kernel ids of the active kernels."""
48 48 return self._kernels.keys()
49 49
50 50 def __len__(self):
51 51 """Return the number of running kernels."""
52 52 return len(self.kernel_ids)
53 53
54 54 def __contains__(self, kernel_id):
55 55 if kernel_id in self.kernel_ids:
56 56 return True
57 57 else:
58 58 return False
59 59
60 60 def start_kernel(self, **kwargs):
61 61 """Start a new kernel."""
62 62 kernel_id = unicode(uuid.uuid4())
63 63 (process, shell_port, iopub_port, stdin_port, hb_port) = launch_kernel(**kwargs)
64 64 # Store the information for contacting the kernel. This assumes the kernel is
65 65 # running on localhost.
66 66 d = dict(
67 67 process = process,
68 68 stdin_port = stdin_port,
69 69 iopub_port = iopub_port,
70 70 shell_port = shell_port,
71 71 hb_port = hb_port,
72 72 ip = '127.0.0.1'
73 73 )
74 74 self._kernels[kernel_id] = d
75 75 return kernel_id
76 76
77 77 def kill_kernel(self, kernel_id):
78 78 """Kill a kernel by its kernel uuid.
79 79
80 80 Parameters
81 81 ==========
82 82 kernel_id : uuid
83 83 The id of the kernel to kill.
84 84 """
85 85 kernel_process = self.get_kernel_process(kernel_id)
86 86 if kernel_process is not None:
87 87 # Attempt to kill the kernel.
88 88 try:
89 89 kernel_process.kill()
90 90 except OSError, e:
91 91 # In Windows, we will get an Access Denied error if the process
92 92 # has already terminated. Ignore it.
93 93 if not (sys.platform == 'win32' and e.winerror == 5):
94 94 raise
95 95 del self._kernels[kernel_id]
96 96
97 97 def interrupt_kernel(self, kernel_id):
98 98 """Interrupt (SIGINT) the kernel by its uuid.
99 99
100 100 Parameters
101 101 ==========
102 102 kernel_id : uuid
103 103 The id of the kernel to interrupt.
104 104 """
105 105 kernel_process = self.get_kernel_process(kernel_id)
106 106 if kernel_process is not None:
107 107 if sys.platform == 'win32':
108 108 from parentpoller import ParentPollerWindows as Poller
109 109 Poller.send_interrupt(kernel_process.win32_interrupt_event)
110 110 else:
111 111 kernel_process.send_signal(signal.SIGINT)
112 112
113 113 def signal_kernel(self, kernel_id, signum):
114 114 """ Sends a signal to the kernel by its uuid.
115 115
116 116 Note that since only SIGTERM is supported on Windows, this function
117 117 is only useful on Unix systems.
118 118
119 119 Parameters
120 120 ==========
121 121 kernel_id : uuid
122 122 The id of the kernel to signal.
123 123 """
124 124 kernel_process = self.get_kernel_process(kernel_id)
125 125 if kernel_process is not None:
126 126 kernel_process.send_signal(signum)
127 127
128 128 def get_kernel_process(self, kernel_id):
129 129 """Get the process object for a kernel by its uuid.
130 130
131 131 Parameters
132 132 ==========
133 133 kernel_id : uuid
134 134 The id of the kernel.
135 135 """
136 136 d = self._kernels.get(kernel_id)
137 137 if d is not None:
138 138 return d['process']
139 139 else:
140 140 raise KeyError("Kernel with id not found: %s" % kernel_id)
141 141
142 142 def get_kernel_ports(self, kernel_id):
143 143 """Return a dictionary of ports for a kernel.
144 144
145 145 Parameters
146 146 ==========
147 147 kernel_id : uuid
148 148 The id of the kernel.
149 149
150 150 Returns
151 151 =======
152 152 port_dict : dict
153 153 A dict of key, value pairs where the keys are the names
154 154 (stdin_port,iopub_port,shell_port) and the values are the
155 155 integer port numbers for those channels.
156 156 """
157 157 d = self._kernels.get(kernel_id)
158 158 if d is not None:
159 159 dcopy = d.copy()
160 160 dcopy.pop('process')
161 161 dcopy.pop('ip')
162 162 return dcopy
163 163 else:
164 164 raise KeyError("Kernel with id not found: %s" % kernel_id)
165 165
166 166 def get_kernel_ip(self, kernel_id):
167 167 """Return ip address for a kernel.
168 168
169 169 Parameters
170 170 ==========
171 171 kernel_id : uuid
172 172 The id of the kernel.
173 173
174 174 Returns
175 175 =======
176 176 ip : str
177 177 The ip address of the kernel.
178 178 """
179 179 d = self._kernels.get(kernel_id)
180 180 if d is not None:
181 181 return d['ip']
182 182 else:
183 183 raise KeyError("Kernel with id not found: %s" % kernel_id)
184 184
185 185 def create_session_manager(self, kernel_id):
186 186 """Create a new session manager for a kernel by its uuid."""
187 187 from sessionmanager import SessionManager
188 188 return SessionManager(
189 189 kernel_id=kernel_id, kernel_manager=self,
190 190 config=self.config, context=self.context, log=self.log
191 191 )
192 192
193 193
194 194 class RoutingKernelManager(LoggingConfigurable):
195 195 """A KernelManager that handles WebSocket routing and HTTP error handling"""
196 196
197 197 kernel_argv = List(Unicode)
198 198 kernel_manager = Instance(KernelManager)
199 199
200 200 _routers = Dict()
201 201 _session_dict = Dict()
202 202 _notebook_mapping = Dict()
203 203
204 204 #-------------------------------------------------------------------------
205 205 # Methods for managing kernels and sessions
206 206 #-------------------------------------------------------------------------
207 207
208 208 @property
209 209 def kernel_ids(self):
210 """List the kernel ids."""
210 211 return self.kernel_manager.kernel_ids
211 212
213 def kernel_for_notebook(self, notebook_id):
214 """Return the kernel_id for a notebook_id or None."""
215 return self._notebook_mapping.get(notebook_id)
216
217 def set_kernel_for_notebook(self, notebook_id, kernel_id):
218 """Associate a notebook with a kernel."""
219 if notebook_id is not None:
220 self._notebook_mapping[notebook_id] = kernel_id
221
212 222 def notebook_for_kernel(self, kernel_id):
223 """Return the notebook_id for a kernel_id or None."""
213 224 notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
214 225 if len(notebook_ids) == 1:
215 226 return notebook_ids[0]
216 227 else:
217 228 return None
218 229
219 230 def delete_mapping_for_kernel(self, kernel_id):
231 """Remove the kernel/notebook mapping for kernel_id."""
220 232 notebook_id = self.notebook_for_kernel(kernel_id)
221 233 if notebook_id is not None:
222 234 del self._notebook_mapping[notebook_id]
223 235
224 236 def start_kernel(self, notebook_id=None):
237 """Start a kernel an return its kernel_id.
238
239 Parameters
240 ----------
241 notebook_id : uuid
242 The uuid of the notebook to associate the new kernel with. If this
243 is not None, this kernel will be persistent whenever the notebook
244 requests a kernel.
245 """
225 246 self.log.info
226 kernel_id = self._notebook_mapping.get(notebook_id)
247 kernel_id = self.kernel_for_notebook(notebook_id)
227 248 if kernel_id is None:
228 249 kwargs = dict()
229 250 kwargs['extra_arguments'] = self.kernel_argv
230 251 kernel_id = self.kernel_manager.start_kernel(**kwargs)
231 if notebook_id is not None:
232 self._notebook_mapping[notebook_id] = kernel_id
233 self.log.info("Kernel started for notebook %s: %s" % (notebook_id,kernel_id))
252 self.set_kernel_for_notebook(notebook_id, kernel_id)
253 self.log.info("Kernel started: %s" % kernel_id)
234 254 self.log.debug("Kernel args: %r" % kwargs)
235 255 self.start_session_manager(kernel_id)
236 256 else:
237 257 self.log.info("Using existing kernel: %s" % kernel_id)
238 258 return kernel_id
239 259
240 260 def start_session_manager(self, kernel_id):
261 """Start the ZMQ sockets (a "session") to connect to a kernel."""
241 262 sm = self.kernel_manager.create_session_manager(kernel_id)
242 263 self._session_dict[kernel_id] = sm
243 264 iopub_stream = sm.get_iopub_stream()
244 265 shell_stream = sm.get_shell_stream()
245 266 iopub_router = IOPubStreamRouter(
246 267 zmq_stream=iopub_stream, session=sm.session, config=self.config
247 268 )
248 269 shell_router = ShellStreamRouter(
249 270 zmq_stream=shell_stream, session=sm.session, config=self.config
250 271 )
251 self._routers[(kernel_id, 'iopub')] = iopub_router
252 self._routers[(kernel_id, 'shell')] = shell_router
272 self.set_router(kernel_id, 'iopub', iopub_router)
273 self.set_router(kernel_id, 'shell', shell_router)
253 274
254 275 def kill_kernel(self, kernel_id):
276 """Kill a kernel and remove its notebook association."""
255 277 if kernel_id not in self.kernel_manager:
256 278 raise web.HTTPError(404)
257 279 try:
258 280 sm = self._session_dict.pop(kernel_id)
259 281 except KeyError:
260 282 raise web.HTTPError(404)
261 283 sm.stop()
262 284 self.kernel_manager.kill_kernel(kernel_id)
263 285 self.delete_mapping_for_kernel(kernel_id)
264 286 self.log.info("Kernel killed: %s" % kernel_id)
265 287
266 288 def interrupt_kernel(self, kernel_id):
289 """Interrupt a kernel."""
267 290 if kernel_id not in self.kernel_manager:
268 291 raise web.HTTPError(404)
269 292 self.kernel_manager.interrupt_kernel(kernel_id)
270 293 self.log.debug("Kernel interrupted: %s" % kernel_id)
271 294
272 295 def restart_kernel(self, kernel_id):
296 """Restart a kernel while keeping clients connected."""
273 297 if kernel_id not in self.kernel_manager:
274 298 raise web.HTTPError(404)
275 299
276 300 # Get the notebook_id to preserve the kernel/notebook association
277 301 notebook_id = self.notebook_for_kernel(kernel_id)
278 302 # Create the new kernel first so we can move the clients over.
279 303 new_kernel_id = self.start_kernel()
280 304
281 305 # Copy the clients over to the new routers.
282 306 old_iopub_router = self.get_router(kernel_id, 'iopub')
283 307 old_shell_router = self.get_router(kernel_id, 'shell')
284 308 new_iopub_router = self.get_router(new_kernel_id, 'iopub')
285 309 new_shell_router = self.get_router(new_kernel_id, 'shell')
286 310 new_iopub_router.copy_clients(old_iopub_router)
287 311 new_shell_router.copy_clients(old_shell_router)
288 312
313 # Shut down the old routers
314 old_shell_router.close()
315 old_iopub_router.close()
316 self.delete_router(kernel_id, 'shell')
317 self.delete_router(kernel_id, 'iopub')
318 del old_shell_router
319 del old_iopub_router
320
289 321 # Now shutdown the old session and the kernel.
290 322 # TODO: This causes a hard crash in ZMQStream.close, which sets
291 323 # self.socket to None to hastily. We will need to fix this in PyZMQ
292 324 # itself. For now, we just leave the old kernel running :(
293 325 # Maybe this is fixed now, but nothing was changed really.
294 326 self.kill_kernel(kernel_id)
295 327
296 328 # Now save the new kernel/notebook association. We have to save it
297 329 # after the old kernel is killed as that will delete the mapping.
298 self._notebook_mapping[notebook_id] = kernel_id
330 self.set_kernel_for_notebook(notebook_id, new_kernel_id)
299 331
300 self.log.debug("Kernel restarted: %s -> %s" % (kernel_id, new_kernel_id))
332 self.log.debug("Kernel restarted: %s" % new_kernel_id)
301 333 return new_kernel_id
302 334
303 335 def get_router(self, kernel_id, stream_name):
336 """Return the router for a given kernel_id and stream name."""
304 337 router = self._routers[(kernel_id, stream_name)]
305 338 return router
306 339
340 def set_router(self, kernel_id, stream_name, router):
341 """Set the router for a given kernel_id and stream_name."""
342 self._routers[(kernel_id, stream_name)] = router
343
344 def delete_router(self, kernel_id, stream_name):
345 """Delete a router for a kernel_id and stream_name."""
346 try:
347 del self._routers[(kernel_id, stream_name)]
348 except KeyError:
349 pass
350
@@ -1,118 +1,125 b''
1 1 """Routers that connect WebSockets to ZMQ sockets."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING.txt, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13
14 14 import uuid
15 15 from Queue import Queue
16 16 import json
17 17
18 18 from IPython.config.configurable import Configurable
19 19 from IPython.utils.traitlets import Instance, Int, Dict
20 20
21 21 #-----------------------------------------------------------------------------
22 22 # Classes
23 23 #-----------------------------------------------------------------------------
24 24
25 25 class ZMQStreamRouter(Configurable):
26 26
27 27 zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
28 28 session = Instance('IPython.zmq.session.Session')
29 29 max_msg_size = Int(2048, config=True, help="""
30 30 The max raw message size accepted from the browser
31 31 over a WebSocket connection.
32 32 """)
33 33
34 34 _clients = Dict()
35 35
36 36 def __init__(self, **kwargs):
37 37 super(ZMQStreamRouter,self).__init__(**kwargs)
38 38 self.zmq_stream.on_recv(self._on_zmq_reply)
39 39
40 def __del__(self):
41 self.close()
42
43 def close(self):
44 """Disable the routing actions of this router."""
45 self._clients = {}
46 self.zmq_stream.on_recv(None)
47
40 48 def register_client(self, client):
41 49 """Register a client, returning a client uuid."""
42 client_id = uuid.uuid4()
50 client_id = unicode(uuid.uuid4())
43 51 self._clients[client_id] = client
44 52 return client_id
45 53
46 54 def unregister_client(self, client_id):
47 55 """Unregister a client by its client uuid."""
48 56 del self._clients[client_id]
49 57
50 58 def copy_clients(self, router):
51 59 """Copy the clients of another router to this one.
52 60
53 61 This is used to enable the backend zeromq stream to disconnect
54 62 and reconnect while the WebSocket connections to browsers
55 63 remain, such as when a kernel is restarted.
56 64 """
57 65 for client_id, client in router._clients.items():
58 66 client.router = self
59 67 self._clients[client_id] = client
60 68
61 69 def forward_msg(self, client_id, msg):
62 70 """Forward a msg to a client by its id.
63 71
64 72 The default implementation of this will fail silently if a message
65 73 arrives on a socket that doesn't support it. This method should
66 74 use max_msg_size to check and silently discard message that are too
67 75 long."""
68 76 pass
69 77
70 78 def _on_zmq_reply(self, msg_list):
71 79 """Handle a message the ZMQ stream sends to the router.
72 80
73 81 Usually, this is where the return message will be written to
74 82 clients that need it using client.write_message().
75 83 """
76 84 pass
77 85
78 86 def _reserialize_reply(self, msg_list):
79 87 """Reserialize a reply message using JSON.
80 88
81 89 This takes the msg list from the ZMQ socket, unserializes it using
82 90 self.session and then serializes the result using JSON. This method
83 91 should be used by self._on_zmq_reply to build messages that can
84 92 be sent back to the browser.
85 93 """
86 94 idents, msg_list = self.session.feed_identities(msg_list)
87 95 msg = self.session.unserialize(msg_list)
88 96 msg['header'].pop('date')
89 97 msg.pop('buffers')
90 98 return json.dumps(msg)
91 99
92 100
93 101 class IOPubStreamRouter(ZMQStreamRouter):
94 102
95 103 def _on_zmq_reply(self, msg_list):
96 104 msg = self._reserialize_reply(msg_list)
97 105 for client_id, client in self._clients.items():
98 106 client.write_message(msg)
99 107
100 108
101 109 class ShellStreamRouter(ZMQStreamRouter):
102 110
103 111 _request_queue = Instance(Queue,(),{})
104 112
105 113 def _on_zmq_reply(self, msg_list):
106 114 msg = self._reserialize_reply(msg_list)
107 115 client_id = self._request_queue.get(block=False)
108 116 client = self._clients.get(client_id)
109 117 if client is not None:
110 118 client.write_message(msg)
111 119
112 120 def forward_msg(self, client_id, msg):
113 121 if len(msg) < self.max_msg_size:
114 122 msg = json.loads(msg)
115 # to_send = self.session.serialize(msg)
116 123 self._request_queue.put(client_id)
117 124 self.session.send(self.zmq_stream, msg)
118 125
General Comments 0
You need to be logged in to leave comments. Login now