handlers.py
233 lines
| 7.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r16697 | """Tornado handlers for kernels.""" | ||
Brian E. Granger
|
r10641 | |||
MinRK
|
r16697 | # Copyright (c) IPython Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||||
Brian E. Granger
|
r10641 | |||
MinRK
|
r17021 | import json | ||
Brian E. Granger
|
r10641 | import logging | ||
from tornado import web | ||||
from IPython.utils.jsonutil import date_default | ||||
MinRK
|
r16697 | from IPython.utils.py3compat import string_types | ||
MinRK
|
r13132 | from IPython.html.utils import url_path_join, url_escape | ||
Brian E. Granger
|
r10641 | |||
Zachary Sailer
|
r13052 | from ...base.handlers import IPythonHandler, json_errors | ||
MinRK
|
r18332 | from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message | ||
Brian E. Granger
|
r10641 | |||
MinRK
|
r16697 | from IPython.core.release import kernel_protocol_version | ||
Brian E. Granger
|
r10641 | |||
class MainKernelHandler(IPythonHandler): | ||||
@web.authenticated | ||||
Zachary Sailer
|
r13052 | @json_errors | ||
Brian E. Granger
|
r10641 | def get(self): | ||
km = self.kernel_manager | ||||
MinRK
|
r17021 | self.finish(json.dumps(km.list_kernels())) | ||
Brian E. Granger
|
r10641 | |||
@web.authenticated | ||||
Zachary Sailer
|
r13052 | @json_errors | ||
Brian E. Granger
|
r10641 | def post(self): | ||
MinRK
|
r18308 | km = self.kernel_manager | ||
Thomas Kluyver
|
r17221 | model = self.get_json_body() | ||
if model is None: | ||||
MinRK
|
r18308 | model = { | ||
'name': km.default_kernel_name | ||||
} | ||||
else: | ||||
model.setdefault('name', km.default_kernel_name) | ||||
Thomas Kluyver
|
r17221 | |||
MinRK
|
r18308 | kernel_id = km.start_kernel(kernel_name=model['name']) | ||
MinRK
|
r15400 | model = km.kernel_model(kernel_id) | ||
MinRK
|
r15310 | location = url_path_join(self.base_url, 'api', 'kernels', kernel_id) | ||
MinRK
|
r13132 | self.set_header('Location', url_escape(location)) | ||
Zachary Sailer
|
r13058 | self.set_status(201) | ||
MinRK
|
r17021 | self.finish(json.dumps(model)) | ||
Brian E. Granger
|
r10641 | |||
class KernelHandler(IPythonHandler): | ||||
Zachary Sailer
|
r12983 | SUPPORTED_METHODS = ('DELETE', 'GET') | ||
@web.authenticated | ||||
Zachary Sailer
|
r13052 | @json_errors | ||
Zachary Sailer
|
r12983 | def get(self, kernel_id): | ||
km = self.kernel_manager | ||||
Zachary Sailer
|
r13058 | km._check_kernel_id(kernel_id) | ||
MinRK
|
r15400 | model = km.kernel_model(kernel_id) | ||
MinRK
|
r17021 | self.finish(json.dumps(model)) | ||
Brian E. Granger
|
r10641 | |||
@web.authenticated | ||||
Zachary Sailer
|
r13052 | @json_errors | ||
Brian E. Granger
|
r10641 | def delete(self, kernel_id): | ||
km = self.kernel_manager | ||||
km.shutdown_kernel(kernel_id) | ||||
self.set_status(204) | ||||
self.finish() | ||||
class KernelActionHandler(IPythonHandler): | ||||
@web.authenticated | ||||
Zachary Sailer
|
r13052 | @json_errors | ||
Brian E. Granger
|
r10641 | def post(self, kernel_id, action): | ||
km = self.kernel_manager | ||||
if action == 'interrupt': | ||||
km.interrupt_kernel(kernel_id) | ||||
self.set_status(204) | ||||
if action == 'restart': | ||||
km.restart_kernel(kernel_id) | ||||
MinRK
|
r15400 | model = km.kernel_model(kernel_id) | ||
MinRK
|
r15310 | self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id)) | ||
MinRK
|
r17021 | self.write(json.dumps(model)) | ||
Brian E. Granger
|
r10641 | self.finish() | ||
class ZMQChannelHandler(AuthenticatedZMQStreamHandler): | ||||
MinRK
|
r17647 | def __repr__(self): | ||
return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized')) | ||||
Brian E. Granger
|
r10641 | def create_stream(self): | ||
km = self.kernel_manager | ||||
meth = getattr(km, 'connect_%s' % self.channel) | ||||
self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession) | ||||
MinRK
|
r16698 | # Create a kernel_info channel to query the kernel protocol version. | ||
# This channel will be closed after the kernel_info reply is received. | ||||
MinRK
|
r16697 | self.kernel_info_channel = None | ||
self.kernel_info_channel = km.connect_shell(self.kernel_id) | ||||
MinRK
|
r16698 | self.kernel_info_channel.on_recv(self._handle_kernel_info_reply) | ||
MinRK
|
r16697 | self._request_kernel_info() | ||
def _request_kernel_info(self): | ||||
MinRK
|
r16698 | """send a request for kernel_info""" | ||
MinRK
|
r16697 | self.log.debug("requesting kernel info") | ||
self.session.send(self.kernel_info_channel, "kernel_info_request") | ||||
MinRK
|
r16698 | def _handle_kernel_info_reply(self, msg): | ||
"""process the kernel_info_reply | ||||
enabling msg spec adaptation, if necessary | ||||
""" | ||||
MinRK
|
r16697 | idents,msg = self.session.feed_identities(msg) | ||
try: | ||||
MinRK
|
r18330 | msg = self.session.deserialize(msg) | ||
MinRK
|
r16697 | except: | ||
self.log.error("Bad kernel_info reply", exc_info=True) | ||||
self._request_kernel_info() | ||||
return | ||||
else: | ||||
if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in msg['content']: | ||||
self.log.error("Kernel info request failed, assuming current %s", msg['content']) | ||||
else: | ||||
protocol_version = msg['content']['protocol_version'] | ||||
if protocol_version != kernel_protocol_version: | ||||
self.session.adapt_version = int(protocol_version.split('.')[0]) | ||||
self.log.info("adapting kernel to %s" % protocol_version) | ||||
self.kernel_info_channel.close() | ||||
self.kernel_info_channel = None | ||||
MinRK
|
r18277 | def initialize(self): | ||
super(ZMQChannelHandler, self).initialize() | ||||
Brian E. Granger
|
r10641 | self.zmq_stream = None | ||
MinRK
|
r18277 | def open(self, kernel_id): | ||
super(ZMQChannelHandler, self).open(kernel_id) | ||||
Brian E. Granger
|
r10641 | try: | ||
self.create_stream() | ||||
except web.HTTPError: | ||||
# WebSockets don't response to traditional error codes so we | ||||
# close the connection. | ||||
if not self.stream.closed(): | ||||
self.stream.close() | ||||
self.close() | ||||
else: | ||||
self.zmq_stream.on_recv(self._on_zmq_reply) | ||||
def on_message(self, msg): | ||||
MinRK
|
r17647 | if self.zmq_stream is None: | ||
return | ||||
elif self.zmq_stream.closed(): | ||||
self.log.info("%s closed, closing websocket.", self) | ||||
self.close() | ||||
return | ||||
MinRK
|
r18332 | if isinstance(msg, bytes): | ||
msg = deserialize_binary_message(msg) | ||||
else: | ||||
msg = json.loads(msg) | ||||
MinRK
|
r11266 | self.session.send(self.zmq_stream, msg) | ||
Brian E. Granger
|
r10641 | |||
def on_close(self): | ||||
# This method can be called twice, once by self.kernel_died and once | ||||
# from the WebSocket close event. If the WebSocket connection is | ||||
# closed before the ZMQ streams are setup, they could be None. | ||||
if self.zmq_stream is not None and not self.zmq_stream.closed(): | ||||
self.zmq_stream.on_recv(None) | ||||
MinRK
|
r16526 | # close the socket directly, don't wait for the stream | ||
socket = self.zmq_stream.socket | ||||
Brian E. Granger
|
r10641 | self.zmq_stream.close() | ||
MinRK
|
r16526 | socket.close() | ||
Brian E. Granger
|
r10641 | |||
class IOPubHandler(ZMQChannelHandler): | ||||
channel = 'iopub' | ||||
def create_stream(self): | ||||
super(IOPubHandler, self).create_stream() | ||||
km = self.kernel_manager | ||||
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) | ||||
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') | ||||
def on_close(self): | ||||
km = self.kernel_manager | ||||
if self.kernel_id in km: | ||||
km.remove_restart_callback( | ||||
self.kernel_id, self.on_kernel_restarted, | ||||
) | ||||
km.remove_restart_callback( | ||||
self.kernel_id, self.on_restart_failed, 'dead', | ||||
) | ||||
super(IOPubHandler, self).on_close() | ||||
def _send_status_message(self, status): | ||||
msg = self.session.msg("status", | ||||
{'execution_state': status} | ||||
) | ||||
MinRK
|
r17021 | self.write_message(json.dumps(msg, default=date_default)) | ||
Brian E. Granger
|
r10641 | |||
def on_kernel_restarted(self): | ||||
logging.warn("kernel %s restarted", self.kernel_id) | ||||
self._send_status_message('restarting') | ||||
def on_restart_failed(self): | ||||
logging.error("kernel %s restarted failed!", self.kernel_id) | ||||
self._send_status_message('dead') | ||||
def on_message(self, msg): | ||||
"""IOPub messages make no sense""" | ||||
pass | ||||
Brian E. Granger
|
r10647 | |||
Brian E. Granger
|
r10641 | class ShellHandler(ZMQChannelHandler): | ||
channel = 'shell' | ||||
Brian E. Granger
|
r10647 | |||
Brian E. Granger
|
r10641 | class StdinHandler(ZMQChannelHandler): | ||
channel = 'stdin' | ||||
Brian E. Granger
|
r10647 | |||
#----------------------------------------------------------------------------- | ||||
# URL to handler mappings | ||||
#----------------------------------------------------------------------------- | ||||
_kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)" | ||||
_kernel_action_regex = r"(?P<action>restart|interrupt)" | ||||
default_handlers = [ | ||||
Zachary Sailer
|
r12983 | (r"/api/kernels", MainKernelHandler), | ||
(r"/api/kernels/%s" % _kernel_id_regex, KernelHandler), | ||||
(r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler), | ||||
(r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler), | ||||
(r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler), | ||||
(r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler) | ||||
Brian E. Granger
|
r10647 | ] | ||