##// END OF EJS Templates
actually send only one kernel_info request...
Min RK -
Show More
@@ -1,294 +1,295 b''
1 1 """Tornado handlers for kernels."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import json
7 7 import logging
8 8 from tornado import gen, web
9 9 from tornado.concurrent import Future
10 10 from tornado.ioloop import IOLoop
11 11
12 12 from IPython.utils.jsonutil import date_default
13 13 from IPython.utils.py3compat import cast_unicode
14 14 from IPython.html.utils import url_path_join, url_escape
15 15
16 16 from ...base.handlers import IPythonHandler, json_errors
17 17 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
18 18
19 19 from IPython.core.release import kernel_protocol_version
20 20
21 21 class MainKernelHandler(IPythonHandler):
22 22
23 23 @web.authenticated
24 24 @json_errors
25 25 def get(self):
26 26 km = self.kernel_manager
27 27 self.finish(json.dumps(km.list_kernels()))
28 28
29 29 @web.authenticated
30 30 @json_errors
31 31 def post(self):
32 32 km = self.kernel_manager
33 33 model = self.get_json_body()
34 34 if model is None:
35 35 model = {
36 36 'name': km.default_kernel_name
37 37 }
38 38 else:
39 39 model.setdefault('name', km.default_kernel_name)
40 40
41 41 kernel_id = km.start_kernel(kernel_name=model['name'])
42 42 model = km.kernel_model(kernel_id)
43 43 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
44 44 self.set_header('Location', url_escape(location))
45 45 self.set_status(201)
46 46 self.finish(json.dumps(model))
47 47
48 48
49 49 class KernelHandler(IPythonHandler):
50 50
51 51 SUPPORTED_METHODS = ('DELETE', 'GET')
52 52
53 53 @web.authenticated
54 54 @json_errors
55 55 def get(self, kernel_id):
56 56 km = self.kernel_manager
57 57 km._check_kernel_id(kernel_id)
58 58 model = km.kernel_model(kernel_id)
59 59 self.finish(json.dumps(model))
60 60
61 61 @web.authenticated
62 62 @json_errors
63 63 def delete(self, kernel_id):
64 64 km = self.kernel_manager
65 65 km.shutdown_kernel(kernel_id)
66 66 self.set_status(204)
67 67 self.finish()
68 68
69 69
70 70 class KernelActionHandler(IPythonHandler):
71 71
72 72 @web.authenticated
73 73 @json_errors
74 74 def post(self, kernel_id, action):
75 75 km = self.kernel_manager
76 76 if action == 'interrupt':
77 77 km.interrupt_kernel(kernel_id)
78 78 self.set_status(204)
79 79 if action == 'restart':
80 80 km.restart_kernel(kernel_id)
81 81 model = km.kernel_model(kernel_id)
82 82 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
83 83 self.write(json.dumps(model))
84 84 self.finish()
85 85
86 86
87 87 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
88 88
89 89 @property
90 90 def kernel_info_timeout(self):
91 91 return self.settings.get('kernel_info_timeout', 10)
92 92
93 93 def __repr__(self):
94 94 return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
95 95
96 96 def create_stream(self):
97 97 km = self.kernel_manager
98 98 meth = getattr(km, 'connect_%s' % self.channel)
99 99 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
100 100
101 101 def request_kernel_info(self):
102 102 """send a request for kernel_info"""
103 103 km = self.kernel_manager
104 104 kernel = km.get_kernel(self.kernel_id)
105 105 try:
106 # check for cached value
107 kernel_info = kernel._kernel_info
106 # check for previous request
107 future = kernel._kernel_info_future
108 108 except AttributeError:
109 109 self.log.debug("Requesting kernel info from %s", self.kernel_id)
110 110 # Create a kernel_info channel to query the kernel protocol version.
111 111 # This channel will be closed after the kernel_info reply is received.
112 112 if self.kernel_info_channel is None:
113 113 self.kernel_info_channel = km.connect_shell(self.kernel_id)
114 114 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
115 115 self.session.send(self.kernel_info_channel, "kernel_info_request")
116 # store the future on the kernel, so only one request is sent
117 kernel._kernel_info_future = self._kernel_info_future
116 118 else:
117 # use cached value, don't resend request
118 self._finish_kernel_info(kernel_info)
119 if not future.done():
120 self.log.debug("Waiting for pending kernel_info request")
121 future.add_done_callback(lambda f: self._finish_kernel_info(f.result()))
119 122 return self._kernel_info_future
120 123
121 124 def _handle_kernel_info_reply(self, msg):
122 125 """process the kernel_info_reply
123 126
124 127 enabling msg spec adaptation, if necessary
125 128 """
126 129 idents,msg = self.session.feed_identities(msg)
127 130 try:
128 131 msg = self.session.deserialize(msg)
129 132 except:
130 133 self.log.error("Bad kernel_info reply", exc_info=True)
131 self._kernel_info_future.set_result(None)
134 self._kernel_info_future.set_result({})
132 135 return
133 136 else:
134 137 info = msg['content']
135 138 self.log.debug("Received kernel info: %s", info)
136 139 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
137 140 self.log.error("Kernel info request failed, assuming current %s", info)
138 else:
139 kernel = self.kernel_manager.get_kernel(self.kernel_id)
140 kernel._kernel_info = info
141 info = {}
141 142 self._finish_kernel_info(info)
142 143
143 144 # close the kernel_info channel, we don't need it anymore
144 145 if self.kernel_info_channel:
145 146 self.kernel_info_channel.close()
146 147 self.kernel_info_channel = None
147 148
148 149 def _finish_kernel_info(self, info):
149 150 """Finish handling kernel_info reply
150 151
151 152 Set up protocol adaptation, if needed,
152 153 and signal that connection can continue.
153 154 """
154 155 protocol_version = info.get('protocol_version', kernel_protocol_version)
155 156 if protocol_version != kernel_protocol_version:
156 157 self.session.adapt_version = int(protocol_version.split('.')[0])
157 158 self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
158 159 if not self._kernel_info_future.done():
159 160 self._kernel_info_future.set_result(info)
160 161
161 162 def initialize(self):
162 163 super(ZMQChannelHandler, self).initialize()
163 164 self.zmq_stream = None
164 165 self.kernel_id = None
165 166 self.kernel_info_channel = None
166 167 self._kernel_info_future = Future()
167 168
168 169 @gen.coroutine
169 170 def pre_get(self):
170 171 # authenticate first
171 172 super(ZMQChannelHandler, self).pre_get()
172 173 # then request kernel info, waiting up to a certain time before giving up.
173 174 # We don't want to wait forever, because browsers don't take it well when
174 175 # servers never respond to websocket connection requests.
175 176 future = self.request_kernel_info()
176 177
177 178 def give_up():
178 179 """Don't wait forever for the kernel to reply"""
179 180 if future.done():
180 181 return
181 182 self.log.warn("Timeout waiting for kernel_info reply from %s", self.kernel_id)
182 future.set_result(None)
183 future.set_result({})
183 184 loop = IOLoop.current()
184 185 loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up)
185 186 # actually wait for it
186 187 yield future
187 188
188 189 @gen.coroutine
189 190 def get(self, kernel_id):
190 191 self.kernel_id = cast_unicode(kernel_id, 'ascii')
191 192 yield super(ZMQChannelHandler, self).get(kernel_id=kernel_id)
192 193
193 194 def open(self, kernel_id):
194 195 super(ZMQChannelHandler, self).open()
195 196 try:
196 197 self.create_stream()
197 198 except web.HTTPError as e:
198 199 self.log.error("Error opening stream: %s", e)
199 200 # WebSockets don't response to traditional error codes so we
200 201 # close the connection.
201 202 if not self.stream.closed():
202 203 self.stream.close()
203 204 self.close()
204 205 else:
205 206 self.zmq_stream.on_recv(self._on_zmq_reply)
206 207
207 208 def on_message(self, msg):
208 209 if self.zmq_stream is None:
209 210 return
210 211 elif self.zmq_stream.closed():
211 212 self.log.info("%s closed, closing websocket.", self)
212 213 self.close()
213 214 return
214 215 if isinstance(msg, bytes):
215 216 msg = deserialize_binary_message(msg)
216 217 else:
217 218 msg = json.loads(msg)
218 219 self.session.send(self.zmq_stream, msg)
219 220
220 221 def on_close(self):
221 222 # This method can be called twice, once by self.kernel_died and once
222 223 # from the WebSocket close event. If the WebSocket connection is
223 224 # closed before the ZMQ streams are setup, they could be None.
224 225 if self.zmq_stream is not None and not self.zmq_stream.closed():
225 226 self.zmq_stream.on_recv(None)
226 227 # close the socket directly, don't wait for the stream
227 228 socket = self.zmq_stream.socket
228 229 self.zmq_stream.close()
229 230 socket.close()
230 231
231 232
232 233 class IOPubHandler(ZMQChannelHandler):
233 234 channel = 'iopub'
234 235
235 236 def create_stream(self):
236 237 super(IOPubHandler, self).create_stream()
237 238 km = self.kernel_manager
238 239 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
239 240 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
240 241
241 242 def on_close(self):
242 243 km = self.kernel_manager
243 244 if self.kernel_id in km:
244 245 km.remove_restart_callback(
245 246 self.kernel_id, self.on_kernel_restarted,
246 247 )
247 248 km.remove_restart_callback(
248 249 self.kernel_id, self.on_restart_failed, 'dead',
249 250 )
250 251 super(IOPubHandler, self).on_close()
251 252
252 253 def _send_status_message(self, status):
253 254 msg = self.session.msg("status",
254 255 {'execution_state': status}
255 256 )
256 257 self.write_message(json.dumps(msg, default=date_default))
257 258
258 259 def on_kernel_restarted(self):
259 260 logging.warn("kernel %s restarted", self.kernel_id)
260 261 self._send_status_message('restarting')
261 262
262 263 def on_restart_failed(self):
263 264 logging.error("kernel %s restarted failed!", self.kernel_id)
264 265 self._send_status_message('dead')
265 266
266 267 def on_message(self, msg):
267 268 """IOPub messages make no sense"""
268 269 pass
269 270
270 271
271 272 class ShellHandler(ZMQChannelHandler):
272 273 channel = 'shell'
273 274
274 275
275 276 class StdinHandler(ZMQChannelHandler):
276 277 channel = 'stdin'
277 278
278 279
279 280 #-----------------------------------------------------------------------------
280 281 # URL to handler mappings
281 282 #-----------------------------------------------------------------------------
282 283
283 284
284 285 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
285 286 _kernel_action_regex = r"(?P<action>restart|interrupt)"
286 287
287 288 default_handlers = [
288 289 (r"/api/kernels", MainKernelHandler),
289 290 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
290 291 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
291 292 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
292 293 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
293 294 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
294 295 ]
General Comments 0
You need to be logged in to leave comments. Login now