##// END OF EJS Templates
Don't resend kernel info requests if a bad reply is received
MinRK -
Show More
@@ -1,269 +1,269 b''
1 1 """Tornado handlers for kernels."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import json
7 7 import logging
8 8 from tornado import gen, web
9 9 from tornado.concurrent import Future
10 10
11 11 from IPython.utils.jsonutil import date_default
12 12 from IPython.utils.py3compat import cast_unicode
13 13 from IPython.html.utils import url_path_join, url_escape
14 14
15 15 from ...base.handlers import IPythonHandler, json_errors
16 16 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
17 17
18 18 from IPython.core.release import kernel_protocol_version
19 19
20 20 class MainKernelHandler(IPythonHandler):
21 21
22 22 @web.authenticated
23 23 @json_errors
24 24 def get(self):
25 25 km = self.kernel_manager
26 26 self.finish(json.dumps(km.list_kernels()))
27 27
28 28 @web.authenticated
29 29 @json_errors
30 30 def post(self):
31 31 km = self.kernel_manager
32 32 model = self.get_json_body()
33 33 if model is None:
34 34 model = {
35 35 'name': km.default_kernel_name
36 36 }
37 37 else:
38 38 model.setdefault('name', km.default_kernel_name)
39 39
40 40 kernel_id = km.start_kernel(kernel_name=model['name'])
41 41 model = km.kernel_model(kernel_id)
42 42 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
43 43 self.set_header('Location', url_escape(location))
44 44 self.set_status(201)
45 45 self.finish(json.dumps(model))
46 46
47 47
48 48 class KernelHandler(IPythonHandler):
49 49
50 50 SUPPORTED_METHODS = ('DELETE', 'GET')
51 51
52 52 @web.authenticated
53 53 @json_errors
54 54 def get(self, kernel_id):
55 55 km = self.kernel_manager
56 56 km._check_kernel_id(kernel_id)
57 57 model = km.kernel_model(kernel_id)
58 58 self.finish(json.dumps(model))
59 59
60 60 @web.authenticated
61 61 @json_errors
62 62 def delete(self, kernel_id):
63 63 km = self.kernel_manager
64 64 km.shutdown_kernel(kernel_id)
65 65 self.set_status(204)
66 66 self.finish()
67 67
68 68
69 69 class KernelActionHandler(IPythonHandler):
70 70
71 71 @web.authenticated
72 72 @json_errors
73 73 def post(self, kernel_id, action):
74 74 km = self.kernel_manager
75 75 if action == 'interrupt':
76 76 km.interrupt_kernel(kernel_id)
77 77 self.set_status(204)
78 78 if action == 'restart':
79 79 km.restart_kernel(kernel_id)
80 80 model = km.kernel_model(kernel_id)
81 81 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
82 82 self.write(json.dumps(model))
83 83 self.finish()
84 84
85 85
86 86 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
87 87
88 88 def __repr__(self):
89 89 return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
90 90
91 91 def create_stream(self):
92 92 km = self.kernel_manager
93 93 meth = getattr(km, 'connect_%s' % self.channel)
94 94 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
95 95
96 96 def request_kernel_info(self):
97 97 """send a request for kernel_info"""
98 98 km = self.kernel_manager
99 99 kernel = km.get_kernel(self.kernel_id)
100 100 try:
101 101 # check for cached value
102 102 kernel_info = kernel._kernel_info
103 103 except AttributeError:
104 104 self.log.debug("Requesting kernel info from %s", self.kernel_id)
105 105 # Create a kernel_info channel to query the kernel protocol version.
106 106 # This channel will be closed after the kernel_info reply is received.
107 107 if self.kernel_info_channel is None:
108 108 self.kernel_info_channel = km.connect_shell(self.kernel_id)
109 109 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
110 110 self.session.send(self.kernel_info_channel, "kernel_info_request")
111 111 else:
112 112 # use cached value, don't resend request
113 113 self._finish_kernel_info(kernel_info)
114 114 return self._kernel_info_future
115 115
116 116 def _handle_kernel_info_reply(self, msg):
117 117 """process the kernel_info_reply
118 118
119 119 enabling msg spec adaptation, if necessary
120 120 """
121 121 idents,msg = self.session.feed_identities(msg)
122 122 try:
123 123 msg = self.session.deserialize(msg)
124 124 except:
125 125 self.log.error("Bad kernel_info reply", exc_info=True)
126 self.request_kernel_info()
126 self._kernel_info_future.set_result(None)
127 127 return
128 128 else:
129 129 info = msg['content']
130 130 self.log.debug("Received kernel info: %s", info)
131 131 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
132 132 self.log.error("Kernel info request failed, assuming current %s", info)
133 133 else:
134 134 kernel = self.kernel_manager.get_kernel(self.kernel_id)
135 135 kernel._kernel_info = info
136 136 self._finish_kernel_info(info)
137 137
138 138 # close the kernel_info channel, we don't need it anymore
139 139 if self.kernel_info_channel:
140 140 self.kernel_info_channel.close()
141 141 self.kernel_info_channel = None
142 142
143 143 def _finish_kernel_info(self, info):
144 144 """Finish handling kernel_info reply
145 145
146 146 Set up protocol adaptation, if needed,
147 147 and signal that connection can continue.
148 148 """
149 149 protocol_version = info.get('protocol_version', kernel_protocol_version)
150 150 if protocol_version != kernel_protocol_version:
151 151 self.session.adapt_version = int(protocol_version.split('.')[0])
152 152 self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
153 153 self._kernel_info_future.set_result(info)
154 154
155 155 def initialize(self):
156 156 super(ZMQChannelHandler, self).initialize()
157 157 self.zmq_stream = None
158 158 self.kernel_id = None
159 159 self.kernel_info_channel = None
160 160 self._kernel_info_future = Future()
161 161
162 162 @gen.coroutine
163 163 def get(self, kernel_id):
164 164 self.kernel_id = cast_unicode(kernel_id, 'ascii')
165 165 yield self.request_kernel_info()
166 166 super(ZMQChannelHandler, self).get(kernel_id)
167 167
168 168 def open(self, kernel_id):
169 169 super(ZMQChannelHandler, self).open()
170 170 try:
171 171 self.create_stream()
172 172 except web.HTTPError as e:
173 173 self.log.error("Error opening stream: %s", e)
174 174 # WebSockets don't response to traditional error codes so we
175 175 # close the connection.
176 176 if not self.stream.closed():
177 177 self.stream.close()
178 178 self.close()
179 179 else:
180 180 self.zmq_stream.on_recv(self._on_zmq_reply)
181 181
182 182 def on_message(self, msg):
183 183 if self.zmq_stream is None:
184 184 return
185 185 elif self.zmq_stream.closed():
186 186 self.log.info("%s closed, closing websocket.", self)
187 187 self.close()
188 188 return
189 189 if isinstance(msg, bytes):
190 190 msg = deserialize_binary_message(msg)
191 191 else:
192 192 msg = json.loads(msg)
193 193 self.session.send(self.zmq_stream, msg)
194 194
195 195 def on_close(self):
196 196 # This method can be called twice, once by self.kernel_died and once
197 197 # from the WebSocket close event. If the WebSocket connection is
198 198 # closed before the ZMQ streams are setup, they could be None.
199 199 if self.zmq_stream is not None and not self.zmq_stream.closed():
200 200 self.zmq_stream.on_recv(None)
201 201 # close the socket directly, don't wait for the stream
202 202 socket = self.zmq_stream.socket
203 203 self.zmq_stream.close()
204 204 socket.close()
205 205
206 206
207 207 class IOPubHandler(ZMQChannelHandler):
208 208 channel = 'iopub'
209 209
210 210 def create_stream(self):
211 211 super(IOPubHandler, self).create_stream()
212 212 km = self.kernel_manager
213 213 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
214 214 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
215 215
216 216 def on_close(self):
217 217 km = self.kernel_manager
218 218 if self.kernel_id in km:
219 219 km.remove_restart_callback(
220 220 self.kernel_id, self.on_kernel_restarted,
221 221 )
222 222 km.remove_restart_callback(
223 223 self.kernel_id, self.on_restart_failed, 'dead',
224 224 )
225 225 super(IOPubHandler, self).on_close()
226 226
227 227 def _send_status_message(self, status):
228 228 msg = self.session.msg("status",
229 229 {'execution_state': status}
230 230 )
231 231 self.write_message(json.dumps(msg, default=date_default))
232 232
233 233 def on_kernel_restarted(self):
234 234 logging.warn("kernel %s restarted", self.kernel_id)
235 235 self._send_status_message('restarting')
236 236
237 237 def on_restart_failed(self):
238 238 logging.error("kernel %s restarted failed!", self.kernel_id)
239 239 self._send_status_message('dead')
240 240
241 241 def on_message(self, msg):
242 242 """IOPub messages make no sense"""
243 243 pass
244 244
245 245
246 246 class ShellHandler(ZMQChannelHandler):
247 247 channel = 'shell'
248 248
249 249
250 250 class StdinHandler(ZMQChannelHandler):
251 251 channel = 'stdin'
252 252
253 253
254 254 #-----------------------------------------------------------------------------
255 255 # URL to handler mappings
256 256 #-----------------------------------------------------------------------------
257 257
258 258
259 259 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
260 260 _kernel_action_regex = r"(?P<action>restart|interrupt)"
261 261
262 262 default_handlers = [
263 263 (r"/api/kernels", MainKernelHandler),
264 264 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
265 265 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
266 266 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
267 267 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
268 268 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
269 269 ]
General Comments 0
You need to be logged in to leave comments. Login now