##// END OF EJS Templates
Remove accident
Remove accident

File last commit:

r20183:08a21510
r20306:31b298a8
Show More
handlers.py
286 lines | 10.5 KiB | text/x-python | PythonLexer
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 """Tornado handlers for kernels."""
Brian E. Granger
Adding new files.
r10641
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
Brian E. Granger
Adding new files.
r10641
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 import json
Brian E. Granger
Adding new files.
r10641 import logging
MinRK
cache kernel_info reply for protocol adaptation...
r18497 from tornado import gen, web
from tornado.concurrent import Future
Min RK
debugging websocket connections...
r18522 from tornado.ioloop import IOLoop
Brian E. Granger
Adding new files.
r10641
from IPython.utils.jsonutil import date_default
MinRK
cache kernel_info reply for protocol adaptation...
r18497 from IPython.utils.py3compat import cast_unicode
MinRK
escape URLs in Location headers
r13132 from IPython.html.utils import url_path_join, url_escape
Brian E. Granger
Adding new files.
r10641
Zachary Sailer
add error catching to kernel manager...
r13052 from ...base.handlers import IPythonHandler, json_errors
MinRK
support binary message from javascript
r18332 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
Brian E. Granger
Adding new files.
r10641
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 from IPython.core.release import kernel_protocol_version
Brian E. Granger
Adding new files.
r10641
class MainKernelHandler(IPythonHandler):
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def get(self):
km = self.kernel_manager
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.finish(json.dumps(km.list_kernels()))
Brian E. Granger
Adding new files.
r10641
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def post(self):
MinRK
use default kernel name in kernels service...
r18308 km = self.kernel_manager
Thomas Kluyver
Add support for different kernel specs to kernels REST API
r17221 model = self.get_json_body()
if model is None:
MinRK
use default kernel name in kernels service...
r18308 model = {
'name': km.default_kernel_name
}
else:
model.setdefault('name', km.default_kernel_name)
Thomas Kluyver
Add support for different kernel specs to kernels REST API
r17221
MinRK
use default kernel name in kernels service...
r18308 kernel_id = km.start_kernel(kernel_name=model['name'])
MinRK
remove websocket url...
r15400 model = km.kernel_model(kernel_id)
MinRK
remove base_kernel_url
r15310 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
MinRK
escape URLs in Location headers
r13132 self.set_header('Location', url_escape(location))
Zachary Sailer
adding to test_kernels_api.py...
r13058 self.set_status(201)
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.finish(json.dumps(model))
Brian E. Granger
Adding new files.
r10641
class KernelHandler(IPythonHandler):
Zachary Sailer
manual rebase services/kernels/
r12983 SUPPORTED_METHODS = ('DELETE', 'GET')
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Zachary Sailer
manual rebase services/kernels/
r12983 def get(self, kernel_id):
km = self.kernel_manager
Zachary Sailer
adding to test_kernels_api.py...
r13058 km._check_kernel_id(kernel_id)
MinRK
remove websocket url...
r15400 model = km.kernel_model(kernel_id)
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.finish(json.dumps(model))
Brian E. Granger
Adding new files.
r10641
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def delete(self, kernel_id):
km = self.kernel_manager
km.shutdown_kernel(kernel_id)
self.set_status(204)
self.finish()
class KernelActionHandler(IPythonHandler):
@web.authenticated
Zachary Sailer
add error catching to kernel manager...
r13052 @json_errors
Brian E. Granger
Adding new files.
r10641 def post(self, kernel_id, action):
km = self.kernel_manager
if action == 'interrupt':
km.interrupt_kernel(kernel_id)
self.set_status(204)
if action == 'restart':
km.restart_kernel(kernel_id)
MinRK
remove websocket url...
r15400 model = km.kernel_model(kernel_id)
MinRK
remove base_kernel_url
r15310 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.write(json.dumps(model))
Brian E. Granger
Adding new files.
r10641 self.finish()
Min RK
use single WebSocket connection for all channels...
r19824 class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
Brian E. Granger
Adding new files.
r10641
Min RK
debugging websocket connections...
r18522 @property
def kernel_info_timeout(self):
return self.settings.get('kernel_info_timeout', 10)
MinRK
handle undefined or closed zmq_stream in on_message...
r17647 def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
Brian E. Granger
Adding new files.
r10641 def create_stream(self):
km = self.kernel_manager
Min RK
use single WebSocket connection for all channels...
r19824 identity = self.session.bsession
for channel in ('shell', 'iopub', 'stdin'):
meth = getattr(km, 'connect_' + channel)
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
stream.channel = channel
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697
MinRK
cache kernel_info reply for protocol adaptation...
r18497 def request_kernel_info(self):
MinRK
update message spec adapter per review...
r16698 """send a request for kernel_info"""
MinRK
cache kernel_info reply for protocol adaptation...
r18497 km = self.kernel_manager
kernel = km.get_kernel(self.kernel_id)
try:
Min RK
actually send only one kernel_info request...
r18563 # check for previous request
future = kernel._kernel_info_future
MinRK
cache kernel_info reply for protocol adaptation...
r18497 except AttributeError:
self.log.debug("Requesting kernel info from %s", self.kernel_id)
# Create a kernel_info channel to query the kernel protocol version.
# This channel will be closed after the kernel_info reply is received.
if self.kernel_info_channel is None:
self.kernel_info_channel = km.connect_shell(self.kernel_id)
self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
self.session.send(self.kernel_info_channel, "kernel_info_request")
Min RK
actually send only one kernel_info request...
r18563 # store the future on the kernel, so only one request is sent
kernel._kernel_info_future = self._kernel_info_future
MinRK
cache kernel_info reply for protocol adaptation...
r18497 else:
Min RK
actually send only one kernel_info request...
r18563 if not future.done():
self.log.debug("Waiting for pending kernel_info request")
future.add_done_callback(lambda f: self._finish_kernel_info(f.result()))
MinRK
cache kernel_info reply for protocol adaptation...
r18497 return self._kernel_info_future
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697
MinRK
update message spec adapter per review...
r16698 def _handle_kernel_info_reply(self, msg):
"""process the kernel_info_reply
enabling msg spec adaptation, if necessary
"""
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 idents,msg = self.session.feed_identities(msg)
try:
MinRK
s/unserialize/deserialize
r18330 msg = self.session.deserialize(msg)
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 except:
self.log.error("Bad kernel_info reply", exc_info=True)
Min RK
actually send only one kernel_info request...
r18563 self._kernel_info_future.set_result({})
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 return
else:
MinRK
cache kernel_info reply for protocol adaptation...
r18497 info = msg['content']
self.log.debug("Received kernel info: %s", info)
if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
self.log.error("Kernel info request failed, assuming current %s", info)
Min RK
actually send only one kernel_info request...
r18563 info = {}
MinRK
cache kernel_info reply for protocol adaptation...
r18497 self._finish_kernel_info(info)
# close the kernel_info channel, we don't need it anymore
if self.kernel_info_channel:
self.kernel_info_channel.close()
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 self.kernel_info_channel = None
MinRK
cache kernel_info reply for protocol adaptation...
r18497 def _finish_kernel_info(self, info):
"""Finish handling kernel_info reply
Set up protocol adaptation, if needed,
and signal that connection can continue.
"""
protocol_version = info.get('protocol_version', kernel_protocol_version)
if protocol_version != kernel_protocol_version:
self.session.adapt_version = int(protocol_version.split('.')[0])
self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
Min RK
debugging websocket connections...
r18522 if not self._kernel_info_future.done():
self._kernel_info_future.set_result(info)
MinRK
cache kernel_info reply for protocol adaptation...
r18497
MinRK
remove on_first_message authentication...
r18277 def initialize(self):
Min RK
use single WebSocket connection for all channels...
r19824 super(ZMQChannelsHandler, self).initialize()
Brian E. Granger
Adding new files.
r10641 self.zmq_stream = None
Min RK
use single WebSocket connection for all channels...
r19824 self.channels = {}
MinRK
add websocket workarounds for tornado 3...
r18498 self.kernel_id = None
MinRK
cache kernel_info reply for protocol adaptation...
r18497 self.kernel_info_channel = None
self._kernel_info_future = Future()
@gen.coroutine
Min RK
debugging websocket connections...
r18522 def pre_get(self):
# authenticate first
Min RK
use single WebSocket connection for all channels...
r19824 super(ZMQChannelsHandler, self).pre_get()
Min RK
debugging websocket connections...
r18522 # then request kernel info, waiting up to a certain time before giving up.
# We don't want to wait forever, because browsers don't take it well when
# servers never respond to websocket connection requests.
future = self.request_kernel_info()
def give_up():
"""Don't wait forever for the kernel to reply"""
if future.done():
return
self.log.warn("Timeout waiting for kernel_info reply from %s", self.kernel_id)
Min RK
actually send only one kernel_info request...
r18563 future.set_result({})
Min RK
debugging websocket connections...
r18522 loop = IOLoop.current()
loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up)
# actually wait for it
yield future
@gen.coroutine
MinRK
cache kernel_info reply for protocol adaptation...
r18497 def get(self, kernel_id):
self.kernel_id = cast_unicode(kernel_id, 'ascii')
Min RK
use single WebSocket connection for all channels...
r19824 yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)
Brian E. Granger
Adding new files.
r10641
MinRK
remove on_first_message authentication...
r18277 def open(self, kernel_id):
Min RK
use single WebSocket connection for all channels...
r19824 super(ZMQChannelsHandler, self).open()
Brian E. Granger
Adding new files.
r10641 try:
self.create_stream()
MinRK
add websocket workarounds for tornado 3...
r18498 except web.HTTPError as e:
self.log.error("Error opening stream: %s", e)
Brian E. Granger
Adding new files.
r10641 # WebSockets don't response to traditional error codes so we
# close the connection.
Min RK
use single WebSocket connection for all channels...
r19824 for channel, stream in self.channels.items():
if not stream.closed():
stream.close()
Brian E. Granger
Adding new files.
r10641 self.close()
else:
Min RK
use single WebSocket connection for all channels...
r19824 for channel, stream in self.channels.items():
stream.on_recv_stream(self._on_zmq_reply)
Brian E. Granger
Adding new files.
r10641
def on_message(self, msg):
Min RK
protect websocket against errant messages...
r20183 if not self.channels:
# already closed, ignore the message
self.log.debug("Received message on closed websocket %r", msg)
return
MinRK
support binary message from javascript
r18332 if isinstance(msg, bytes):
msg = deserialize_binary_message(msg)
else:
msg = json.loads(msg)
Min RK
use single WebSocket connection for all channels...
r19824 channel = msg.pop('channel', None)
if channel is None:
self.log.warn("No channel specified, assuming shell: %s", msg)
channel = 'shell'
Min RK
protect websocket against errant messages...
r20183 if channel not in self.channels:
self.log.warn("No such channel: %r", channel)
return
Min RK
use single WebSocket connection for all channels...
r19824 stream = self.channels[channel]
self.session.send(stream, msg)
Brian E. Granger
Adding new files.
r10641
def on_close(self):
km = self.kernel_manager
if self.kernel_id in km:
km.remove_restart_callback(
self.kernel_id, self.on_kernel_restarted,
)
km.remove_restart_callback(
self.kernel_id, self.on_restart_failed, 'dead',
)
Min RK
use single WebSocket connection for all channels...
r19824 # This method can be called twice, once by self.kernel_died and once
# from the WebSocket close event. If the WebSocket connection is
# closed before the ZMQ streams are setup, they could be None.
for channel, stream in self.channels.items():
if stream is not None and not stream.closed():
stream.on_recv(None)
# close the socket directly, don't wait for the stream
socket = stream.socket
stream.close()
socket.close()
self.channels = {}
Brian E. Granger
Adding new files.
r10641 def _send_status_message(self, status):
msg = self.session.msg("status",
{'execution_state': status}
)
Min RK
use single WebSocket connection for all channels...
r19824 msg['channel'] = 'iopub'
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 self.write_message(json.dumps(msg, default=date_default))
Brian E. Granger
Adding new files.
r10641
def on_kernel_restarted(self):
logging.warn("kernel %s restarted", self.kernel_id)
self._send_status_message('restarting')
def on_restart_failed(self):
logging.error("kernel %s restarted failed!", self.kernel_id)
self._send_status_message('dead')
Brian E. Granger
More work on the handlers
r10647
#-----------------------------------------------------------------------------
# URL to handler mappings
#-----------------------------------------------------------------------------
_kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
_kernel_action_regex = r"(?P<action>restart|interrupt)"
default_handlers = [
Zachary Sailer
manual rebase services/kernels/
r12983 (r"/api/kernels", MainKernelHandler),
(r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
(r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
Min RK
use single WebSocket connection for all channels...
r19824 (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler),
Brian E. Granger
More work on the handlers
r10647 ]