##// END OF EJS Templates
Merge pull request #6831 from Carreau/fix-yield...
Merge pull request #6831 from Carreau/fix-yield Fix for Websocket.pre_get on tornado 3

File last commit:

r18522:a10ba6f7
r18562:241ef9c1 merge
Show More
handlers.py
294 lines | 10.2 KiB | text/x-python | PythonLexer
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 """Tornado handlers for kernels."""
Brian E. Granger
Adding new files.
r10641
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
Brian E. Granger
Adding new files.
r10641
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 import json
Brian E. Granger
Adding new files.
r10641 import logging
MinRK
cache kernel_info reply for protocol adaptation...
r18497 from tornado import gen, web
from tornado.concurrent import Future
Min RK
debugging websocket connections...
r18522 from tornado.ioloop import IOLoop
Brian E. Granger
Adding new files.
r10641
from IPython.utils.jsonutil import date_default
MinRK
cache kernel_info reply for protocol adaptation...
r18497 from IPython.utils.py3compat import cast_unicode
MinRK
escape URLs in Location headers
r13132 from IPython.html.utils import url_path_join, url_escape
Brian E. Granger
Adding new files.
r10641
Zachary Sailer
add error catching to kernel manager...
r13052 from ...base.handlers import IPythonHandler, json_errors
MinRK
support binary message from javascript
r18332 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
Brian E. Granger
Adding new files.
r10641
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 from IPython.core.release import kernel_protocol_version
Brian E. Granger
Adding new files.
r10641
class MainKernelHandler(IPythonHandler):
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def get(self):
km = self.kernel_manager
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.finish(json.dumps(km.list_kernels()))
Brian E. Granger
Adding new files.
r10641
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def post(self):
MinRK
use default kernel name in kernels service...
r18308 km = self.kernel_manager
Thomas Kluyver
Add support for different kernel specs to kernels REST API
r17221 model = self.get_json_body()
if model is None:
MinRK
use default kernel name in kernels service...
r18308 model = {
'name': km.default_kernel_name
}
else:
model.setdefault('name', km.default_kernel_name)
Thomas Kluyver
Add support for different kernel specs to kernels REST API
r17221
MinRK
use default kernel name in kernels service...
r18308 kernel_id = km.start_kernel(kernel_name=model['name'])
MinRK
remove websocket url...
r15400 model = km.kernel_model(kernel_id)
MinRK
remove base_kernel_url
r15310 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
MinRK
escape URLs in Location headers
r13132 self.set_header('Location', url_escape(location))
Zachary Sailer
adding to test_kernels_api.py...
r13058 self.set_status(201)
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.finish(json.dumps(model))
Brian E. Granger
Adding new files.
r10641
class KernelHandler(IPythonHandler):
Zachary Sailer
manual rebase services/kernels/
r12983 SUPPORTED_METHODS = ('DELETE', 'GET')
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Zachary Sailer
manual rebase services/kernels/
r12983 def get(self, kernel_id):
km = self.kernel_manager
Zachary Sailer
adding to test_kernels_api.py...
r13058 km._check_kernel_id(kernel_id)
MinRK
remove websocket url...
r15400 model = km.kernel_model(kernel_id)
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.finish(json.dumps(model))
Brian E. Granger
Adding new files.
r10641
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def delete(self, kernel_id):
km = self.kernel_manager
km.shutdown_kernel(kernel_id)
self.set_status(204)
self.finish()
class KernelActionHandler(IPythonHandler):
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def post(self, kernel_id, action):
km = self.kernel_manager
if action == 'interrupt':
km.interrupt_kernel(kernel_id)
self.set_status(204)
if action == 'restart':
km.restart_kernel(kernel_id)
MinRK
remove websocket url...
r15400 model = km.kernel_model(kernel_id)
MinRK
remove base_kernel_url
r15310 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.write(json.dumps(model))
Brian E. Granger
Adding new files.
r10641 self.finish()
class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
Min RK
debugging websocket connections...
r18522 @property
def kernel_info_timeout(self):
return self.settings.get('kernel_info_timeout', 10)
MinRK
handle undefined or closed zmq_stream in on_message...
r17647 def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
Brian E. Granger
Adding new files.
r10641 def create_stream(self):
km = self.kernel_manager
meth = getattr(km, 'connect_%s' % self.channel)
self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697
MinRK
cache kernel_info reply for protocol adaptation...
r18497 def request_kernel_info(self):
MinRK
update message spec adapter per review...
r16698 """send a request for kernel_info"""
MinRK
cache kernel_info reply for protocol adaptation...
r18497 km = self.kernel_manager
kernel = km.get_kernel(self.kernel_id)
try:
# check for cached value
kernel_info = kernel._kernel_info
except AttributeError:
self.log.debug("Requesting kernel info from %s", self.kernel_id)
# Create a kernel_info channel to query the kernel protocol version.
# This channel will be closed after the kernel_info reply is received.
if self.kernel_info_channel is None:
self.kernel_info_channel = km.connect_shell(self.kernel_id)
self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
self.session.send(self.kernel_info_channel, "kernel_info_request")
else:
# use cached value, don't resend request
self._finish_kernel_info(kernel_info)
return self._kernel_info_future
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697
MinRK
update message spec adapter per review...
r16698 def _handle_kernel_info_reply(self, msg):
"""process the kernel_info_reply
enabling msg spec adaptation, if necessary
"""
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 idents,msg = self.session.feed_identities(msg)
try:
MinRK
s/unserialize/deserialize
r18330 msg = self.session.deserialize(msg)
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 except:
self.log.error("Bad kernel_info reply", exc_info=True)
MinRK
Don't resend kernel info requests if a bad reply is received
r18499 self._kernel_info_future.set_result(None)
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 return
else:
MinRK
cache kernel_info reply for protocol adaptation...
r18497 info = msg['content']
self.log.debug("Received kernel info: %s", info)
if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
self.log.error("Kernel info request failed, assuming current %s", info)
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 else:
MinRK
cache kernel_info reply for protocol adaptation...
r18497 kernel = self.kernel_manager.get_kernel(self.kernel_id)
kernel._kernel_info = info
self._finish_kernel_info(info)
# close the kernel_info channel, we don't need it anymore
if self.kernel_info_channel:
self.kernel_info_channel.close()
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 self.kernel_info_channel = None
MinRK
cache kernel_info reply for protocol adaptation...
r18497 def _finish_kernel_info(self, info):
"""Finish handling kernel_info reply
Set up protocol adaptation, if needed,
and signal that connection can continue.
"""
protocol_version = info.get('protocol_version', kernel_protocol_version)
if protocol_version != kernel_protocol_version:
self.session.adapt_version = int(protocol_version.split('.')[0])
self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
Min RK
debugging websocket connections...
r18522 if not self._kernel_info_future.done():
self._kernel_info_future.set_result(info)
MinRK
cache kernel_info reply for protocol adaptation...
r18497
MinRK
remove on_first_message authentication...
r18277 def initialize(self):
super(ZMQChannelHandler, self).initialize()
Brian E. Granger
Adding new files.
r10641 self.zmq_stream = None
MinRK
add websocket workarounds for tornado 3...
r18498 self.kernel_id = None
MinRK
cache kernel_info reply for protocol adaptation...
r18497 self.kernel_info_channel = None
self._kernel_info_future = Future()
@gen.coroutine
Min RK
debugging websocket connections...
r18522 def pre_get(self):
# authenticate first
super(ZMQChannelHandler, self).pre_get()
# then request kernel info, waiting up to a certain time before giving up.
# We don't want to wait forever, because browsers don't take it well when
# servers never respond to websocket connection requests.
future = self.request_kernel_info()
def give_up():
"""Don't wait forever for the kernel to reply"""
if future.done():
return
self.log.warn("Timeout waiting for kernel_info reply from %s", self.kernel_id)
future.set_result(None)
loop = IOLoop.current()
loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up)
# actually wait for it
yield future
@gen.coroutine
MinRK
cache kernel_info reply for protocol adaptation...
r18497 def get(self, kernel_id):
self.kernel_id = cast_unicode(kernel_id, 'ascii')
Min RK
debugging websocket connections...
r18522 yield super(ZMQChannelHandler, self).get(kernel_id=kernel_id)
Brian E. Granger
Adding new files.
r10641
MinRK
remove on_first_message authentication...
r18277 def open(self, kernel_id):
MinRK
cache kernel_info reply for protocol adaptation...
r18497 super(ZMQChannelHandler, self).open()
Brian E. Granger
Adding new files.
r10641 try:
self.create_stream()
MinRK
add websocket workarounds for tornado 3...
r18498 except web.HTTPError as e:
self.log.error("Error opening stream: %s", e)
Brian E. Granger
Adding new files.
r10641 # WebSockets don't response to traditional error codes so we
# close the connection.
if not self.stream.closed():
self.stream.close()
self.close()
else:
self.zmq_stream.on_recv(self._on_zmq_reply)
def on_message(self, msg):
MinRK
handle undefined or closed zmq_stream in on_message...
r17647 if self.zmq_stream is None:
return
elif self.zmq_stream.closed():
self.log.info("%s closed, closing websocket.", self)
self.close()
return
MinRK
support binary message from javascript
r18332 if isinstance(msg, bytes):
msg = deserialize_binary_message(msg)
else:
msg = json.loads(msg)
MinRK
remove max_msg_size altogether...
r11266 self.session.send(self.zmq_stream, msg)
Brian E. Granger
Adding new files.
r10641
def on_close(self):
# This method can be called twice, once by self.kernel_died and once
# from the WebSocket close event. If the WebSocket connection is
# closed before the ZMQ streams are setup, they could be None.
if self.zmq_stream is not None and not self.zmq_stream.closed():
self.zmq_stream.on_recv(None)
MinRK
cleanup socket cleanup...
r16526 # close the socket directly, don't wait for the stream
socket = self.zmq_stream.socket
Brian E. Granger
Adding new files.
r10641 self.zmq_stream.close()
MinRK
cleanup socket cleanup...
r16526 socket.close()
Brian E. Granger
Adding new files.
r10641
class IOPubHandler(ZMQChannelHandler):
channel = 'iopub'
def create_stream(self):
super(IOPubHandler, self).create_stream()
km = self.kernel_manager
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
def on_close(self):
km = self.kernel_manager
if self.kernel_id in km:
km.remove_restart_callback(
self.kernel_id, self.on_kernel_restarted,
)
km.remove_restart_callback(
self.kernel_id, self.on_restart_failed, 'dead',
)
super(IOPubHandler, self).on_close()
def _send_status_message(self, status):
msg = self.session.msg("status",
{'execution_state': status}
)
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.write_message(json.dumps(msg, default=date_default))
Brian E. Granger
Adding new files.
r10641
def on_kernel_restarted(self):
logging.warn("kernel %s restarted", self.kernel_id)
self._send_status_message('restarting')
def on_restart_failed(self):
logging.error("kernel %s restarted failed!", self.kernel_id)
self._send_status_message('dead')
def on_message(self, msg):
"""IOPub messages make no sense"""
pass
Brian E. Granger
More work on the handlers
r10647
Brian E. Granger
Adding new files.
r10641 class ShellHandler(ZMQChannelHandler):
channel = 'shell'
Brian E. Granger
More work on the handlers
r10647
Brian E. Granger
Adding new files.
r10641 class StdinHandler(ZMQChannelHandler):
channel = 'stdin'
Brian E. Granger
More work on the handlers
r10647
#-----------------------------------------------------------------------------
# URL to handler mappings
#-----------------------------------------------------------------------------
_kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
_kernel_action_regex = r"(?P<action>restart|interrupt)"
default_handlers = [
Zachary Sailer
manual rebase services/kernels/
r12983 (r"/api/kernels", MainKernelHandler),
(r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
(r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
(r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
(r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
(r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
Brian E. Granger
More work on the handlers
r10647 ]