handlers.py
294 lines
| 10.2 KiB
| text/x-python
|
PythonLexer
MinRK
|
r16697 | """Tornado handlers for kernels.""" | ||
Brian E. Granger
|
r10641 | |||
MinRK
|
r16697 | # Copyright (c) IPython Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||||
Brian E. Granger
|
r10641 | |||
MinRK
|
r17021 | import json | ||
Brian E. Granger
|
r10641 | import logging | ||
MinRK
|
r18497 | from tornado import gen, web | ||
from tornado.concurrent import Future | ||||
Min RK
|
r18522 | from tornado.ioloop import IOLoop | ||
Brian E. Granger
|
r10641 | |||
from IPython.utils.jsonutil import date_default | ||||
MinRK
|
r18497 | from IPython.utils.py3compat import cast_unicode | ||
MinRK
|
r13132 | from IPython.html.utils import url_path_join, url_escape | ||
Brian E. Granger
|
r10641 | |||
Zachary Sailer
|
r13052 | from ...base.handlers import IPythonHandler, json_errors | ||
MinRK
|
r18332 | from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message | ||
Brian E. Granger
|
r10641 | |||
MinRK
|
r16697 | from IPython.core.release import kernel_protocol_version | ||
Brian E. Granger
|
r10641 | |||
class MainKernelHandler(IPythonHandler): | ||||
@web.authenticated | ||||
Zachary Sailer
|
r13052 | @json_errors | ||
Brian E. Granger
|
r10641 | def get(self): | ||
km = self.kernel_manager | ||||
MinRK
|
r17021 | self.finish(json.dumps(km.list_kernels())) | ||
Brian E. Granger
|
r10641 | |||
@web.authenticated | ||||
Zachary Sailer
|
r13052 | @json_errors | ||
Brian E. Granger
|
r10641 | def post(self): | ||
MinRK
|
r18308 | km = self.kernel_manager | ||
Thomas Kluyver
|
r17221 | model = self.get_json_body() | ||
if model is None: | ||||
MinRK
|
r18308 | model = { | ||
'name': km.default_kernel_name | ||||
} | ||||
else: | ||||
model.setdefault('name', km.default_kernel_name) | ||||
Thomas Kluyver
|
r17221 | |||
MinRK
|
r18308 | kernel_id = km.start_kernel(kernel_name=model['name']) | ||
MinRK
|
r15400 | model = km.kernel_model(kernel_id) | ||
MinRK
|
r15310 | location = url_path_join(self.base_url, 'api', 'kernels', kernel_id) | ||
MinRK
|
r13132 | self.set_header('Location', url_escape(location)) | ||
Zachary Sailer
|
r13058 | self.set_status(201) | ||
MinRK
|
r17021 | self.finish(json.dumps(model)) | ||
Brian E. Granger
|
r10641 | |||
class KernelHandler(IPythonHandler): | ||||
Zachary Sailer
|
r12983 | SUPPORTED_METHODS = ('DELETE', 'GET') | ||
@web.authenticated | ||||
Zachary Sailer
|
r13052 | @json_errors | ||
Zachary Sailer
|
r12983 | def get(self, kernel_id): | ||
km = self.kernel_manager | ||||
Zachary Sailer
|
r13058 | km._check_kernel_id(kernel_id) | ||
MinRK
|
r15400 | model = km.kernel_model(kernel_id) | ||
MinRK
|
r17021 | self.finish(json.dumps(model)) | ||
Brian E. Granger
|
r10641 | |||
@web.authenticated | ||||
Zachary Sailer
|
r13052 | @json_errors | ||
Brian E. Granger
|
r10641 | def delete(self, kernel_id): | ||
km = self.kernel_manager | ||||
km.shutdown_kernel(kernel_id) | ||||
self.set_status(204) | ||||
self.finish() | ||||
class KernelActionHandler(IPythonHandler): | ||||
@web.authenticated | ||||
Zachary Sailer
|
r13052 | @json_errors | ||
Brian E. Granger
|
r10641 | def post(self, kernel_id, action): | ||
km = self.kernel_manager | ||||
if action == 'interrupt': | ||||
km.interrupt_kernel(kernel_id) | ||||
self.set_status(204) | ||||
if action == 'restart': | ||||
km.restart_kernel(kernel_id) | ||||
MinRK
|
r15400 | model = km.kernel_model(kernel_id) | ||
MinRK
|
r15310 | self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id)) | ||
MinRK
|
r17021 | self.write(json.dumps(model)) | ||
Brian E. Granger
|
r10641 | self.finish() | ||
class ZMQChannelHandler(AuthenticatedZMQStreamHandler): | ||||
Min RK
|
r18522 | @property | ||
def kernel_info_timeout(self): | ||||
return self.settings.get('kernel_info_timeout', 10) | ||||
MinRK
|
r17647 | def __repr__(self): | ||
return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized')) | ||||
Brian E. Granger
|
r10641 | def create_stream(self): | ||
km = self.kernel_manager | ||||
meth = getattr(km, 'connect_%s' % self.channel) | ||||
self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession) | ||||
MinRK
|
r16697 | |||
MinRK
|
r18497 | def request_kernel_info(self): | ||
MinRK
|
r16698 | """send a request for kernel_info""" | ||
MinRK
|
r18497 | km = self.kernel_manager | ||
kernel = km.get_kernel(self.kernel_id) | ||||
try: | ||||
# check for cached value | ||||
kernel_info = kernel._kernel_info | ||||
except AttributeError: | ||||
self.log.debug("Requesting kernel info from %s", self.kernel_id) | ||||
# Create a kernel_info channel to query the kernel protocol version. | ||||
# This channel will be closed after the kernel_info reply is received. | ||||
if self.kernel_info_channel is None: | ||||
self.kernel_info_channel = km.connect_shell(self.kernel_id) | ||||
self.kernel_info_channel.on_recv(self._handle_kernel_info_reply) | ||||
self.session.send(self.kernel_info_channel, "kernel_info_request") | ||||
else: | ||||
# use cached value, don't resend request | ||||
self._finish_kernel_info(kernel_info) | ||||
return self._kernel_info_future | ||||
MinRK
|
r16697 | |||
MinRK
|
r16698 | def _handle_kernel_info_reply(self, msg): | ||
"""process the kernel_info_reply | ||||
enabling msg spec adaptation, if necessary | ||||
""" | ||||
MinRK
|
r16697 | idents,msg = self.session.feed_identities(msg) | ||
try: | ||||
MinRK
|
r18330 | msg = self.session.deserialize(msg) | ||
MinRK
|
r16697 | except: | ||
self.log.error("Bad kernel_info reply", exc_info=True) | ||||
MinRK
|
r18499 | self._kernel_info_future.set_result(None) | ||
MinRK
|
r16697 | return | ||
else: | ||||
MinRK
|
r18497 | info = msg['content'] | ||
self.log.debug("Received kernel info: %s", info) | ||||
if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info: | ||||
self.log.error("Kernel info request failed, assuming current %s", info) | ||||
MinRK
|
r16697 | else: | ||
MinRK
|
r18497 | kernel = self.kernel_manager.get_kernel(self.kernel_id) | ||
kernel._kernel_info = info | ||||
self._finish_kernel_info(info) | ||||
# close the kernel_info channel, we don't need it anymore | ||||
if self.kernel_info_channel: | ||||
self.kernel_info_channel.close() | ||||
MinRK
|
r16697 | self.kernel_info_channel = None | ||
MinRK
|
r18497 | def _finish_kernel_info(self, info): | ||
"""Finish handling kernel_info reply | ||||
Set up protocol adaptation, if needed, | ||||
and signal that connection can continue. | ||||
""" | ||||
protocol_version = info.get('protocol_version', kernel_protocol_version) | ||||
if protocol_version != kernel_protocol_version: | ||||
self.session.adapt_version = int(protocol_version.split('.')[0]) | ||||
self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version) | ||||
Min RK
|
r18522 | if not self._kernel_info_future.done(): | ||
self._kernel_info_future.set_result(info) | ||||
MinRK
|
r18497 | |||
MinRK
|
r18277 | def initialize(self): | ||
super(ZMQChannelHandler, self).initialize() | ||||
Brian E. Granger
|
r10641 | self.zmq_stream = None | ||
MinRK
|
r18498 | self.kernel_id = None | ||
MinRK
|
r18497 | self.kernel_info_channel = None | ||
self._kernel_info_future = Future() | ||||
@gen.coroutine | ||||
Min RK
|
r18522 | def pre_get(self): | ||
# authenticate first | ||||
super(ZMQChannelHandler, self).pre_get() | ||||
# then request kernel info, waiting up to a certain time before giving up. | ||||
# We don't want to wait forever, because browsers don't take it well when | ||||
# servers never respond to websocket connection requests. | ||||
future = self.request_kernel_info() | ||||
def give_up(): | ||||
"""Don't wait forever for the kernel to reply""" | ||||
if future.done(): | ||||
return | ||||
self.log.warn("Timeout waiting for kernel_info reply from %s", self.kernel_id) | ||||
future.set_result(None) | ||||
loop = IOLoop.current() | ||||
loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up) | ||||
# actually wait for it | ||||
yield future | ||||
@gen.coroutine | ||||
MinRK
|
r18497 | def get(self, kernel_id): | ||
self.kernel_id = cast_unicode(kernel_id, 'ascii') | ||||
Min RK
|
r18522 | yield super(ZMQChannelHandler, self).get(kernel_id=kernel_id) | ||
Brian E. Granger
|
r10641 | |||
MinRK
|
r18277 | def open(self, kernel_id): | ||
MinRK
|
r18497 | super(ZMQChannelHandler, self).open() | ||
Brian E. Granger
|
r10641 | try: | ||
self.create_stream() | ||||
MinRK
|
r18498 | except web.HTTPError as e: | ||
self.log.error("Error opening stream: %s", e) | ||||
Brian E. Granger
|
r10641 | # WebSockets don't response to traditional error codes so we | ||
# close the connection. | ||||
if not self.stream.closed(): | ||||
self.stream.close() | ||||
self.close() | ||||
else: | ||||
self.zmq_stream.on_recv(self._on_zmq_reply) | ||||
def on_message(self, msg): | ||||
MinRK
|
r17647 | if self.zmq_stream is None: | ||
return | ||||
elif self.zmq_stream.closed(): | ||||
self.log.info("%s closed, closing websocket.", self) | ||||
self.close() | ||||
return | ||||
MinRK
|
r18332 | if isinstance(msg, bytes): | ||
msg = deserialize_binary_message(msg) | ||||
else: | ||||
msg = json.loads(msg) | ||||
MinRK
|
r11266 | self.session.send(self.zmq_stream, msg) | ||
Brian E. Granger
|
r10641 | |||
def on_close(self): | ||||
# This method can be called twice, once by self.kernel_died and once | ||||
# from the WebSocket close event. If the WebSocket connection is | ||||
# closed before the ZMQ streams are setup, they could be None. | ||||
if self.zmq_stream is not None and not self.zmq_stream.closed(): | ||||
self.zmq_stream.on_recv(None) | ||||
MinRK
|
r16526 | # close the socket directly, don't wait for the stream | ||
socket = self.zmq_stream.socket | ||||
Brian E. Granger
|
r10641 | self.zmq_stream.close() | ||
MinRK
|
r16526 | socket.close() | ||
Brian E. Granger
|
r10641 | |||
class IOPubHandler(ZMQChannelHandler): | ||||
channel = 'iopub' | ||||
def create_stream(self): | ||||
super(IOPubHandler, self).create_stream() | ||||
km = self.kernel_manager | ||||
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) | ||||
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') | ||||
def on_close(self): | ||||
km = self.kernel_manager | ||||
if self.kernel_id in km: | ||||
km.remove_restart_callback( | ||||
self.kernel_id, self.on_kernel_restarted, | ||||
) | ||||
km.remove_restart_callback( | ||||
self.kernel_id, self.on_restart_failed, 'dead', | ||||
) | ||||
super(IOPubHandler, self).on_close() | ||||
def _send_status_message(self, status): | ||||
msg = self.session.msg("status", | ||||
{'execution_state': status} | ||||
) | ||||
MinRK
|
r17021 | self.write_message(json.dumps(msg, default=date_default)) | ||
Brian E. Granger
|
r10641 | |||
def on_kernel_restarted(self): | ||||
logging.warn("kernel %s restarted", self.kernel_id) | ||||
self._send_status_message('restarting') | ||||
def on_restart_failed(self): | ||||
logging.error("kernel %s restarted failed!", self.kernel_id) | ||||
self._send_status_message('dead') | ||||
def on_message(self, msg): | ||||
"""IOPub messages make no sense""" | ||||
pass | ||||
Brian E. Granger
|
r10647 | |||
Brian E. Granger
|
r10641 | class ShellHandler(ZMQChannelHandler): | ||
channel = 'shell' | ||||
Brian E. Granger
|
r10647 | |||
Brian E. Granger
|
r10641 | class StdinHandler(ZMQChannelHandler): | ||
channel = 'stdin' | ||||
Brian E. Granger
|
r10647 | |||
#----------------------------------------------------------------------------- | ||||
# URL to handler mappings | ||||
#----------------------------------------------------------------------------- | ||||
_kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)" | ||||
_kernel_action_regex = r"(?P<action>restart|interrupt)" | ||||
default_handlers = [ | ||||
Zachary Sailer
|
r12983 | (r"/api/kernels", MainKernelHandler), | ||
(r"/api/kernels/%s" % _kernel_id_regex, KernelHandler), | ||||
(r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler), | ||||
(r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler), | ||||
(r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler), | ||||
(r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler) | ||||
Brian E. Granger
|
r10647 | ] | ||