##// END OF EJS Templates
Don't resend kernel info requests if a bad reply is received
MinRK -
Show More
@@ -1,269 +1,269 b''
1 """Tornado handlers for kernels."""
1 """Tornado handlers for kernels."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import json
6 import json
7 import logging
7 import logging
8 from tornado import gen, web
8 from tornado import gen, web
9 from tornado.concurrent import Future
9 from tornado.concurrent import Future
10
10
11 from IPython.utils.jsonutil import date_default
11 from IPython.utils.jsonutil import date_default
12 from IPython.utils.py3compat import cast_unicode
12 from IPython.utils.py3compat import cast_unicode
13 from IPython.html.utils import url_path_join, url_escape
13 from IPython.html.utils import url_path_join, url_escape
14
14
15 from ...base.handlers import IPythonHandler, json_errors
15 from ...base.handlers import IPythonHandler, json_errors
16 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
16 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
17
17
18 from IPython.core.release import kernel_protocol_version
18 from IPython.core.release import kernel_protocol_version
19
19
20 class MainKernelHandler(IPythonHandler):
20 class MainKernelHandler(IPythonHandler):
21
21
22 @web.authenticated
22 @web.authenticated
23 @json_errors
23 @json_errors
24 def get(self):
24 def get(self):
25 km = self.kernel_manager
25 km = self.kernel_manager
26 self.finish(json.dumps(km.list_kernels()))
26 self.finish(json.dumps(km.list_kernels()))
27
27
28 @web.authenticated
28 @web.authenticated
29 @json_errors
29 @json_errors
30 def post(self):
30 def post(self):
31 km = self.kernel_manager
31 km = self.kernel_manager
32 model = self.get_json_body()
32 model = self.get_json_body()
33 if model is None:
33 if model is None:
34 model = {
34 model = {
35 'name': km.default_kernel_name
35 'name': km.default_kernel_name
36 }
36 }
37 else:
37 else:
38 model.setdefault('name', km.default_kernel_name)
38 model.setdefault('name', km.default_kernel_name)
39
39
40 kernel_id = km.start_kernel(kernel_name=model['name'])
40 kernel_id = km.start_kernel(kernel_name=model['name'])
41 model = km.kernel_model(kernel_id)
41 model = km.kernel_model(kernel_id)
42 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
42 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
43 self.set_header('Location', url_escape(location))
43 self.set_header('Location', url_escape(location))
44 self.set_status(201)
44 self.set_status(201)
45 self.finish(json.dumps(model))
45 self.finish(json.dumps(model))
46
46
47
47
48 class KernelHandler(IPythonHandler):
48 class KernelHandler(IPythonHandler):
49
49
50 SUPPORTED_METHODS = ('DELETE', 'GET')
50 SUPPORTED_METHODS = ('DELETE', 'GET')
51
51
52 @web.authenticated
52 @web.authenticated
53 @json_errors
53 @json_errors
54 def get(self, kernel_id):
54 def get(self, kernel_id):
55 km = self.kernel_manager
55 km = self.kernel_manager
56 km._check_kernel_id(kernel_id)
56 km._check_kernel_id(kernel_id)
57 model = km.kernel_model(kernel_id)
57 model = km.kernel_model(kernel_id)
58 self.finish(json.dumps(model))
58 self.finish(json.dumps(model))
59
59
60 @web.authenticated
60 @web.authenticated
61 @json_errors
61 @json_errors
62 def delete(self, kernel_id):
62 def delete(self, kernel_id):
63 km = self.kernel_manager
63 km = self.kernel_manager
64 km.shutdown_kernel(kernel_id)
64 km.shutdown_kernel(kernel_id)
65 self.set_status(204)
65 self.set_status(204)
66 self.finish()
66 self.finish()
67
67
68
68
69 class KernelActionHandler(IPythonHandler):
69 class KernelActionHandler(IPythonHandler):
70
70
71 @web.authenticated
71 @web.authenticated
72 @json_errors
72 @json_errors
73 def post(self, kernel_id, action):
73 def post(self, kernel_id, action):
74 km = self.kernel_manager
74 km = self.kernel_manager
75 if action == 'interrupt':
75 if action == 'interrupt':
76 km.interrupt_kernel(kernel_id)
76 km.interrupt_kernel(kernel_id)
77 self.set_status(204)
77 self.set_status(204)
78 if action == 'restart':
78 if action == 'restart':
79 km.restart_kernel(kernel_id)
79 km.restart_kernel(kernel_id)
80 model = km.kernel_model(kernel_id)
80 model = km.kernel_model(kernel_id)
81 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
81 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
82 self.write(json.dumps(model))
82 self.write(json.dumps(model))
83 self.finish()
83 self.finish()
84
84
85
85
86 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
86 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
87
87
88 def __repr__(self):
88 def __repr__(self):
89 return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
89 return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
90
90
91 def create_stream(self):
91 def create_stream(self):
92 km = self.kernel_manager
92 km = self.kernel_manager
93 meth = getattr(km, 'connect_%s' % self.channel)
93 meth = getattr(km, 'connect_%s' % self.channel)
94 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
94 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
95
95
96 def request_kernel_info(self):
96 def request_kernel_info(self):
97 """send a request for kernel_info"""
97 """send a request for kernel_info"""
98 km = self.kernel_manager
98 km = self.kernel_manager
99 kernel = km.get_kernel(self.kernel_id)
99 kernel = km.get_kernel(self.kernel_id)
100 try:
100 try:
101 # check for cached value
101 # check for cached value
102 kernel_info = kernel._kernel_info
102 kernel_info = kernel._kernel_info
103 except AttributeError:
103 except AttributeError:
104 self.log.debug("Requesting kernel info from %s", self.kernel_id)
104 self.log.debug("Requesting kernel info from %s", self.kernel_id)
105 # Create a kernel_info channel to query the kernel protocol version.
105 # Create a kernel_info channel to query the kernel protocol version.
106 # This channel will be closed after the kernel_info reply is received.
106 # This channel will be closed after the kernel_info reply is received.
107 if self.kernel_info_channel is None:
107 if self.kernel_info_channel is None:
108 self.kernel_info_channel = km.connect_shell(self.kernel_id)
108 self.kernel_info_channel = km.connect_shell(self.kernel_id)
109 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
109 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
110 self.session.send(self.kernel_info_channel, "kernel_info_request")
110 self.session.send(self.kernel_info_channel, "kernel_info_request")
111 else:
111 else:
112 # use cached value, don't resend request
112 # use cached value, don't resend request
113 self._finish_kernel_info(kernel_info)
113 self._finish_kernel_info(kernel_info)
114 return self._kernel_info_future
114 return self._kernel_info_future
115
115
116 def _handle_kernel_info_reply(self, msg):
116 def _handle_kernel_info_reply(self, msg):
117 """process the kernel_info_reply
117 """process the kernel_info_reply
118
118
119 enabling msg spec adaptation, if necessary
119 enabling msg spec adaptation, if necessary
120 """
120 """
121 idents,msg = self.session.feed_identities(msg)
121 idents,msg = self.session.feed_identities(msg)
122 try:
122 try:
123 msg = self.session.deserialize(msg)
123 msg = self.session.deserialize(msg)
124 except:
124 except:
125 self.log.error("Bad kernel_info reply", exc_info=True)
125 self.log.error("Bad kernel_info reply", exc_info=True)
126 self.request_kernel_info()
126 self._kernel_info_future.set_result(None)
127 return
127 return
128 else:
128 else:
129 info = msg['content']
129 info = msg['content']
130 self.log.debug("Received kernel info: %s", info)
130 self.log.debug("Received kernel info: %s", info)
131 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
131 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
132 self.log.error("Kernel info request failed, assuming current %s", info)
132 self.log.error("Kernel info request failed, assuming current %s", info)
133 else:
133 else:
134 kernel = self.kernel_manager.get_kernel(self.kernel_id)
134 kernel = self.kernel_manager.get_kernel(self.kernel_id)
135 kernel._kernel_info = info
135 kernel._kernel_info = info
136 self._finish_kernel_info(info)
136 self._finish_kernel_info(info)
137
137
138 # close the kernel_info channel, we don't need it anymore
138 # close the kernel_info channel, we don't need it anymore
139 if self.kernel_info_channel:
139 if self.kernel_info_channel:
140 self.kernel_info_channel.close()
140 self.kernel_info_channel.close()
141 self.kernel_info_channel = None
141 self.kernel_info_channel = None
142
142
143 def _finish_kernel_info(self, info):
143 def _finish_kernel_info(self, info):
144 """Finish handling kernel_info reply
144 """Finish handling kernel_info reply
145
145
146 Set up protocol adaptation, if needed,
146 Set up protocol adaptation, if needed,
147 and signal that connection can continue.
147 and signal that connection can continue.
148 """
148 """
149 protocol_version = info.get('protocol_version', kernel_protocol_version)
149 protocol_version = info.get('protocol_version', kernel_protocol_version)
150 if protocol_version != kernel_protocol_version:
150 if protocol_version != kernel_protocol_version:
151 self.session.adapt_version = int(protocol_version.split('.')[0])
151 self.session.adapt_version = int(protocol_version.split('.')[0])
152 self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
152 self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
153 self._kernel_info_future.set_result(info)
153 self._kernel_info_future.set_result(info)
154
154
155 def initialize(self):
155 def initialize(self):
156 super(ZMQChannelHandler, self).initialize()
156 super(ZMQChannelHandler, self).initialize()
157 self.zmq_stream = None
157 self.zmq_stream = None
158 self.kernel_id = None
158 self.kernel_id = None
159 self.kernel_info_channel = None
159 self.kernel_info_channel = None
160 self._kernel_info_future = Future()
160 self._kernel_info_future = Future()
161
161
162 @gen.coroutine
162 @gen.coroutine
163 def get(self, kernel_id):
163 def get(self, kernel_id):
164 self.kernel_id = cast_unicode(kernel_id, 'ascii')
164 self.kernel_id = cast_unicode(kernel_id, 'ascii')
165 yield self.request_kernel_info()
165 yield self.request_kernel_info()
166 super(ZMQChannelHandler, self).get(kernel_id)
166 super(ZMQChannelHandler, self).get(kernel_id)
167
167
168 def open(self, kernel_id):
168 def open(self, kernel_id):
169 super(ZMQChannelHandler, self).open()
169 super(ZMQChannelHandler, self).open()
170 try:
170 try:
171 self.create_stream()
171 self.create_stream()
172 except web.HTTPError as e:
172 except web.HTTPError as e:
173 self.log.error("Error opening stream: %s", e)
173 self.log.error("Error opening stream: %s", e)
174 # WebSockets don't response to traditional error codes so we
174 # WebSockets don't response to traditional error codes so we
175 # close the connection.
175 # close the connection.
176 if not self.stream.closed():
176 if not self.stream.closed():
177 self.stream.close()
177 self.stream.close()
178 self.close()
178 self.close()
179 else:
179 else:
180 self.zmq_stream.on_recv(self._on_zmq_reply)
180 self.zmq_stream.on_recv(self._on_zmq_reply)
181
181
182 def on_message(self, msg):
182 def on_message(self, msg):
183 if self.zmq_stream is None:
183 if self.zmq_stream is None:
184 return
184 return
185 elif self.zmq_stream.closed():
185 elif self.zmq_stream.closed():
186 self.log.info("%s closed, closing websocket.", self)
186 self.log.info("%s closed, closing websocket.", self)
187 self.close()
187 self.close()
188 return
188 return
189 if isinstance(msg, bytes):
189 if isinstance(msg, bytes):
190 msg = deserialize_binary_message(msg)
190 msg = deserialize_binary_message(msg)
191 else:
191 else:
192 msg = json.loads(msg)
192 msg = json.loads(msg)
193 self.session.send(self.zmq_stream, msg)
193 self.session.send(self.zmq_stream, msg)
194
194
195 def on_close(self):
195 def on_close(self):
196 # This method can be called twice, once by self.kernel_died and once
196 # This method can be called twice, once by self.kernel_died and once
197 # from the WebSocket close event. If the WebSocket connection is
197 # from the WebSocket close event. If the WebSocket connection is
198 # closed before the ZMQ streams are setup, they could be None.
198 # closed before the ZMQ streams are setup, they could be None.
199 if self.zmq_stream is not None and not self.zmq_stream.closed():
199 if self.zmq_stream is not None and not self.zmq_stream.closed():
200 self.zmq_stream.on_recv(None)
200 self.zmq_stream.on_recv(None)
201 # close the socket directly, don't wait for the stream
201 # close the socket directly, don't wait for the stream
202 socket = self.zmq_stream.socket
202 socket = self.zmq_stream.socket
203 self.zmq_stream.close()
203 self.zmq_stream.close()
204 socket.close()
204 socket.close()
205
205
206
206
207 class IOPubHandler(ZMQChannelHandler):
207 class IOPubHandler(ZMQChannelHandler):
208 channel = 'iopub'
208 channel = 'iopub'
209
209
210 def create_stream(self):
210 def create_stream(self):
211 super(IOPubHandler, self).create_stream()
211 super(IOPubHandler, self).create_stream()
212 km = self.kernel_manager
212 km = self.kernel_manager
213 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
213 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
214 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
214 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
215
215
216 def on_close(self):
216 def on_close(self):
217 km = self.kernel_manager
217 km = self.kernel_manager
218 if self.kernel_id in km:
218 if self.kernel_id in km:
219 km.remove_restart_callback(
219 km.remove_restart_callback(
220 self.kernel_id, self.on_kernel_restarted,
220 self.kernel_id, self.on_kernel_restarted,
221 )
221 )
222 km.remove_restart_callback(
222 km.remove_restart_callback(
223 self.kernel_id, self.on_restart_failed, 'dead',
223 self.kernel_id, self.on_restart_failed, 'dead',
224 )
224 )
225 super(IOPubHandler, self).on_close()
225 super(IOPubHandler, self).on_close()
226
226
227 def _send_status_message(self, status):
227 def _send_status_message(self, status):
228 msg = self.session.msg("status",
228 msg = self.session.msg("status",
229 {'execution_state': status}
229 {'execution_state': status}
230 )
230 )
231 self.write_message(json.dumps(msg, default=date_default))
231 self.write_message(json.dumps(msg, default=date_default))
232
232
233 def on_kernel_restarted(self):
233 def on_kernel_restarted(self):
234 logging.warn("kernel %s restarted", self.kernel_id)
234 logging.warn("kernel %s restarted", self.kernel_id)
235 self._send_status_message('restarting')
235 self._send_status_message('restarting')
236
236
237 def on_restart_failed(self):
237 def on_restart_failed(self):
238 logging.error("kernel %s restarted failed!", self.kernel_id)
238 logging.error("kernel %s restarted failed!", self.kernel_id)
239 self._send_status_message('dead')
239 self._send_status_message('dead')
240
240
241 def on_message(self, msg):
241 def on_message(self, msg):
242 """IOPub messages make no sense"""
242 """IOPub messages make no sense"""
243 pass
243 pass
244
244
245
245
246 class ShellHandler(ZMQChannelHandler):
246 class ShellHandler(ZMQChannelHandler):
247 channel = 'shell'
247 channel = 'shell'
248
248
249
249
250 class StdinHandler(ZMQChannelHandler):
250 class StdinHandler(ZMQChannelHandler):
251 channel = 'stdin'
251 channel = 'stdin'
252
252
253
253
254 #-----------------------------------------------------------------------------
254 #-----------------------------------------------------------------------------
255 # URL to handler mappings
255 # URL to handler mappings
256 #-----------------------------------------------------------------------------
256 #-----------------------------------------------------------------------------
257
257
258
258
259 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
259 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
260 _kernel_action_regex = r"(?P<action>restart|interrupt)"
260 _kernel_action_regex = r"(?P<action>restart|interrupt)"
261
261
262 default_handlers = [
262 default_handlers = [
263 (r"/api/kernels", MainKernelHandler),
263 (r"/api/kernels", MainKernelHandler),
264 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
264 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
265 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
265 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
266 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
266 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
267 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
267 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
268 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
268 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
269 ]
269 ]
General Comments 0
You need to be logged in to leave comments. Login now