##// END OF EJS Templates
Merge pull request #6765 from minrk/race-kernel-info...
Thomas Kluyver -
r18500:2857c1f0 merge
parent child Browse files
Show More
@@ -1,3 +1,4 b''
1 # coding: utf-8
1 """Tornado handlers for WebSocket <-> ZMQ sockets."""
2 """Tornado handlers for WebSocket <-> ZMQ sockets."""
2
3
3 # Copyright (c) IPython Development Team.
4 # Copyright (c) IPython Development Team.
@@ -11,12 +12,6 b' try:'
11 except ImportError:
12 except ImportError:
12 from urlparse import urlparse # Py 2
13 from urlparse import urlparse # Py 2
13
14
14 try:
15 from http.cookies import SimpleCookie # Py 3
16 except ImportError:
17 from Cookie import SimpleCookie # Py 2
18 import logging
19
20 import tornado
15 import tornado
21 from tornado import ioloop
16 from tornado import ioloop
22 from tornado import web
17 from tornado import web
@@ -24,7 +19,7 b' from tornado import websocket'
24
19
25 from IPython.kernel.zmq.session import Session
20 from IPython.kernel.zmq.session import Session
26 from IPython.utils.jsonutil import date_default, extract_dates
21 from IPython.utils.jsonutil import date_default, extract_dates
27 from IPython.utils.py3compat import PY3, cast_unicode
22 from IPython.utils.py3compat import cast_unicode
28
23
29 from .handlers import IPythonHandler
24 from .handlers import IPythonHandler
30
25
@@ -218,14 +213,21 b' class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):'
218 self.session.session = cast_unicode(self.get_argument('session_id'))
213 self.session.session = cast_unicode(self.get_argument('session_id'))
219 else:
214 else:
220 self.log.warn("No session ID specified")
215 self.log.warn("No session ID specified")
221
216 # FIXME: only do super get on tornado β‰₯ 4
222 return super(AuthenticatedZMQStreamHandler, self).get(*args, **kwargs)
217 # tornado 3 has no get, will raise 405
218 if tornado.version_info >= (4,):
219 return super(AuthenticatedZMQStreamHandler, self).get(*args, **kwargs)
223
220
224 def initialize(self):
221 def initialize(self):
225 self.session = Session(config=self.config)
222 self.session = Session(config=self.config)
226
223
227 def open(self, kernel_id):
224 def open(self, *args, **kwargs):
228 self.kernel_id = cast_unicode(kernel_id, 'ascii')
225 if tornado.version_info < (4,):
226 try:
227 self.get(*self.open_args, **self.open_kwargs)
228 except web.HTTPError:
229 self.close()
230 raise
229
231
230 # start the pinging
232 # start the pinging
231 if self.ping_interval > 0:
233 if self.ping_interval > 0:
@@ -5,10 +5,11 b''
5
5
6 import json
6 import json
7 import logging
7 import logging
8 from tornado import web
8 from tornado import gen, web
9 from tornado.concurrent import Future
9
10
10 from IPython.utils.jsonutil import date_default
11 from IPython.utils.jsonutil import date_default
11 from IPython.utils.py3compat import string_types
12 from IPython.utils.py3compat import cast_unicode
12 from IPython.html.utils import url_path_join, url_escape
13 from IPython.html.utils import url_path_join, url_escape
13
14
14 from ...base.handlers import IPythonHandler, json_errors
15 from ...base.handlers import IPythonHandler, json_errors
@@ -91,17 +92,26 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
91 km = self.kernel_manager
92 km = self.kernel_manager
92 meth = getattr(km, 'connect_%s' % self.channel)
93 meth = getattr(km, 'connect_%s' % self.channel)
93 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
94 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
94 # Create a kernel_info channel to query the kernel protocol version.
95 # This channel will be closed after the kernel_info reply is received.
96 self.kernel_info_channel = None
97 self.kernel_info_channel = km.connect_shell(self.kernel_id)
98 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
99 self._request_kernel_info()
100
95
101 def _request_kernel_info(self):
96 def request_kernel_info(self):
102 """send a request for kernel_info"""
97 """send a request for kernel_info"""
103 self.log.debug("requesting kernel info")
98 km = self.kernel_manager
104 self.session.send(self.kernel_info_channel, "kernel_info_request")
99 kernel = km.get_kernel(self.kernel_id)
100 try:
101 # check for cached value
102 kernel_info = kernel._kernel_info
103 except AttributeError:
104 self.log.debug("Requesting kernel info from %s", self.kernel_id)
105 # Create a kernel_info channel to query the kernel protocol version.
106 # This channel will be closed after the kernel_info reply is received.
107 if self.kernel_info_channel is None:
108 self.kernel_info_channel = km.connect_shell(self.kernel_id)
109 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
110 self.session.send(self.kernel_info_channel, "kernel_info_request")
111 else:
112 # use cached value, don't resend request
113 self._finish_kernel_info(kernel_info)
114 return self._kernel_info_future
105
115
106 def _handle_kernel_info_reply(self, msg):
116 def _handle_kernel_info_reply(self, msg):
107 """process the kernel_info_reply
117 """process the kernel_info_reply
@@ -113,28 +123,54 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
113 msg = self.session.deserialize(msg)
123 msg = self.session.deserialize(msg)
114 except:
124 except:
115 self.log.error("Bad kernel_info reply", exc_info=True)
125 self.log.error("Bad kernel_info reply", exc_info=True)
116 self._request_kernel_info()
126 self._kernel_info_future.set_result(None)
117 return
127 return
118 else:
128 else:
119 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in msg['content']:
129 info = msg['content']
120 self.log.error("Kernel info request failed, assuming current %s", msg['content'])
130 self.log.debug("Received kernel info: %s", info)
131 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
132 self.log.error("Kernel info request failed, assuming current %s", info)
121 else:
133 else:
122 protocol_version = msg['content']['protocol_version']
134 kernel = self.kernel_manager.get_kernel(self.kernel_id)
123 if protocol_version != kernel_protocol_version:
135 kernel._kernel_info = info
124 self.session.adapt_version = int(protocol_version.split('.')[0])
136 self._finish_kernel_info(info)
125 self.log.info("adapting kernel to %s" % protocol_version)
137
126 self.kernel_info_channel.close()
138 # close the kernel_info channel, we don't need it anymore
139 if self.kernel_info_channel:
140 self.kernel_info_channel.close()
127 self.kernel_info_channel = None
141 self.kernel_info_channel = None
128
142
143 def _finish_kernel_info(self, info):
144 """Finish handling kernel_info reply
145
146 Set up protocol adaptation, if needed,
147 and signal that connection can continue.
148 """
149 protocol_version = info.get('protocol_version', kernel_protocol_version)
150 if protocol_version != kernel_protocol_version:
151 self.session.adapt_version = int(protocol_version.split('.')[0])
152 self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
153 self._kernel_info_future.set_result(info)
154
129 def initialize(self):
155 def initialize(self):
130 super(ZMQChannelHandler, self).initialize()
156 super(ZMQChannelHandler, self).initialize()
131 self.zmq_stream = None
157 self.zmq_stream = None
158 self.kernel_id = None
159 self.kernel_info_channel = None
160 self._kernel_info_future = Future()
161
162 @gen.coroutine
163 def get(self, kernel_id):
164 self.kernel_id = cast_unicode(kernel_id, 'ascii')
165 yield self.request_kernel_info()
166 super(ZMQChannelHandler, self).get(kernel_id)
132
167
133 def open(self, kernel_id):
168 def open(self, kernel_id):
134 super(ZMQChannelHandler, self).open(kernel_id)
169 super(ZMQChannelHandler, self).open()
135 try:
170 try:
136 self.create_stream()
171 self.create_stream()
137 except web.HTTPError:
172 except web.HTTPError as e:
173 self.log.error("Error opening stream: %s", e)
138 # WebSockets don't response to traditional error codes so we
174 # WebSockets don't response to traditional error codes so we
139 # close the connection.
175 # close the connection.
140 if not self.stream.closed():
176 if not self.stream.closed():
General Comments 0
You need to be logged in to leave comments. Login now