##// END OF EJS Templates
Merge pull request #6110 from minrk/binarycomm...
Merge pull request #6110 from minrk/binarycomm support binary buffers in comm messages

File last commit:

r18332:4471a25c
r18418:2a8a2c87 merge
Show More
handlers.py
233 lines | 7.9 KiB | text/x-python | PythonLexer
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 """Tornado handlers for kernels."""
Brian E. Granger
Adding new files.
r10641
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
Brian E. Granger
Adding new files.
r10641
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 import json
Brian E. Granger
Adding new files.
r10641 import logging
from tornado import web
from IPython.utils.jsonutil import date_default
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 from IPython.utils.py3compat import string_types
MinRK
escape URLs in Location headers
r13132 from IPython.html.utils import url_path_join, url_escape
Brian E. Granger
Adding new files.
r10641
Zachary Sailer
add error catching to kernel manager...
r13052 from ...base.handlers import IPythonHandler, json_errors
MinRK
support binary message from javascript
r18332 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
Brian E. Granger
Adding new files.
r10641
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 from IPython.core.release import kernel_protocol_version
Brian E. Granger
Adding new files.
r10641
class MainKernelHandler(IPythonHandler):
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def get(self):
km = self.kernel_manager
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.finish(json.dumps(km.list_kernels()))
Brian E. Granger
Adding new files.
r10641
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def post(self):
MinRK
use default kernel name in kernels service...
r18308 km = self.kernel_manager
Thomas Kluyver
Add support for different kernel specs to kernels REST API
r17221 model = self.get_json_body()
if model is None:
MinRK
use default kernel name in kernels service...
r18308 model = {
'name': km.default_kernel_name
}
else:
model.setdefault('name', km.default_kernel_name)
Thomas Kluyver
Add support for different kernel specs to kernels REST API
r17221
MinRK
use default kernel name in kernels service...
r18308 kernel_id = km.start_kernel(kernel_name=model['name'])
MinRK
remove websocket url...
r15400 model = km.kernel_model(kernel_id)
MinRK
remove base_kernel_url
r15310 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
MinRK
escape URLs in Location headers
r13132 self.set_header('Location', url_escape(location))
Zachary Sailer
adding to test_kernels_api.py...
r13058 self.set_status(201)
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.finish(json.dumps(model))
Brian E. Granger
Adding new files.
r10641
class KernelHandler(IPythonHandler):
Zachary Sailer
manual rebase services/kernels/
r12983 SUPPORTED_METHODS = ('DELETE', 'GET')
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Zachary Sailer
manual rebase services/kernels/
r12983 def get(self, kernel_id):
km = self.kernel_manager
Zachary Sailer
adding to test_kernels_api.py...
r13058 km._check_kernel_id(kernel_id)
MinRK
remove websocket url...
r15400 model = km.kernel_model(kernel_id)
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.finish(json.dumps(model))
Brian E. Granger
Adding new files.
r10641
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def delete(self, kernel_id):
km = self.kernel_manager
km.shutdown_kernel(kernel_id)
self.set_status(204)
self.finish()
class KernelActionHandler(IPythonHandler):
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def post(self, kernel_id, action):
km = self.kernel_manager
if action == 'interrupt':
km.interrupt_kernel(kernel_id)
self.set_status(204)
if action == 'restart':
km.restart_kernel(kernel_id)
MinRK
remove websocket url...
r15400 model = km.kernel_model(kernel_id)
MinRK
remove base_kernel_url
r15310 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.write(json.dumps(model))
Brian E. Granger
Adding new files.
r10641 self.finish()
class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
MinRK
handle undefined or closed zmq_stream in on_message...
r17647 def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
Brian E. Granger
Adding new files.
r10641 def create_stream(self):
km = self.kernel_manager
meth = getattr(km, 'connect_%s' % self.channel)
self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
MinRK
update message spec adapter per review...
r16698 # Create a kernel_info channel to query the kernel protocol version.
# This channel will be closed after the kernel_info reply is received.
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 self.kernel_info_channel = None
self.kernel_info_channel = km.connect_shell(self.kernel_id)
MinRK
update message spec adapter per review...
r16698 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 self._request_kernel_info()
def _request_kernel_info(self):
MinRK
update message spec adapter per review...
r16698 """send a request for kernel_info"""
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 self.log.debug("requesting kernel info")
self.session.send(self.kernel_info_channel, "kernel_info_request")
MinRK
update message spec adapter per review...
r16698 def _handle_kernel_info_reply(self, msg):
"""process the kernel_info_reply
enabling msg spec adaptation, if necessary
"""
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 idents,msg = self.session.feed_identities(msg)
try:
MinRK
s/unserialize/deserialize
r18330 msg = self.session.deserialize(msg)
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 except:
self.log.error("Bad kernel_info reply", exc_info=True)
self._request_kernel_info()
return
else:
if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in msg['content']:
self.log.error("Kernel info request failed, assuming current %s", msg['content'])
else:
protocol_version = msg['content']['protocol_version']
if protocol_version != kernel_protocol_version:
self.session.adapt_version = int(protocol_version.split('.')[0])
self.log.info("adapting kernel to %s" % protocol_version)
self.kernel_info_channel.close()
self.kernel_info_channel = None
MinRK
remove on_first_message authentication...
r18277 def initialize(self):
super(ZMQChannelHandler, self).initialize()
Brian E. Granger
Adding new files.
r10641 self.zmq_stream = None
MinRK
remove on_first_message authentication...
r18277 def open(self, kernel_id):
super(ZMQChannelHandler, self).open(kernel_id)
Brian E. Granger
Adding new files.
r10641 try:
self.create_stream()
except web.HTTPError:
# WebSockets don't response to traditional error codes so we
# close the connection.
if not self.stream.closed():
self.stream.close()
self.close()
else:
self.zmq_stream.on_recv(self._on_zmq_reply)
def on_message(self, msg):
MinRK
handle undefined or closed zmq_stream in on_message...
r17647 if self.zmq_stream is None:
return
elif self.zmq_stream.closed():
self.log.info("%s closed, closing websocket.", self)
self.close()
return
MinRK
support binary message from javascript
r18332 if isinstance(msg, bytes):
msg = deserialize_binary_message(msg)
else:
msg = json.loads(msg)
MinRK
remove max_msg_size altogether...
r11266 self.session.send(self.zmq_stream, msg)
Brian E. Granger
Adding new files.
r10641
def on_close(self):
# This method can be called twice, once by self.kernel_died and once
# from the WebSocket close event. If the WebSocket connection is
# closed before the ZMQ streams are setup, they could be None.
if self.zmq_stream is not None and not self.zmq_stream.closed():
self.zmq_stream.on_recv(None)
MinRK
cleanup socket cleanup...
r16526 # close the socket directly, don't wait for the stream
socket = self.zmq_stream.socket
Brian E. Granger
Adding new files.
r10641 self.zmq_stream.close()
MinRK
cleanup socket cleanup...
r16526 socket.close()
Brian E. Granger
Adding new files.
r10641
class IOPubHandler(ZMQChannelHandler):
channel = 'iopub'
def create_stream(self):
super(IOPubHandler, self).create_stream()
km = self.kernel_manager
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
def on_close(self):
km = self.kernel_manager
if self.kernel_id in km:
km.remove_restart_callback(
self.kernel_id, self.on_kernel_restarted,
)
km.remove_restart_callback(
self.kernel_id, self.on_restart_failed, 'dead',
)
super(IOPubHandler, self).on_close()
def _send_status_message(self, status):
msg = self.session.msg("status",
{'execution_state': status}
)
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.write_message(json.dumps(msg, default=date_default))
Brian E. Granger
Adding new files.
r10641
def on_kernel_restarted(self):
logging.warn("kernel %s restarted", self.kernel_id)
self._send_status_message('restarting')
def on_restart_failed(self):
logging.error("kernel %s restarted failed!", self.kernel_id)
self._send_status_message('dead')
def on_message(self, msg):
"""IOPub messages make no sense"""
pass
Brian E. Granger
More work on the handlers
r10647
Brian E. Granger
Adding new files.
r10641 class ShellHandler(ZMQChannelHandler):
channel = 'shell'
Brian E. Granger
More work on the handlers
r10647
Brian E. Granger
Adding new files.
r10641 class StdinHandler(ZMQChannelHandler):
channel = 'stdin'
Brian E. Granger
More work on the handlers
r10647
#-----------------------------------------------------------------------------
# URL to handler mappings
#-----------------------------------------------------------------------------
_kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
_kernel_action_regex = r"(?P<action>restart|interrupt)"
default_handlers = [
Zachary Sailer
manual rebase services/kernels/
r12983 (r"/api/kernels", MainKernelHandler),
(r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
(r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
(r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
(r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
(r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
Brian E. Granger
More work on the handlers
r10647 ]