##// END OF EJS Templates
Merge pull request #3 from minrk/in-process-weird...
Merge pull request #3 from minrk/in-process-weird remove problematic in-process kernel behavior

File last commit:

r20456:4abb13b9
r20792:87b389e6 merge
Show More
zmqhandlers.py
281 lines | 9.5 KiB | text/x-python | PythonLexer
MinRK
add websocket workarounds for tornado 3...
r18498 # coding: utf-8
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 """Tornado handlers for WebSocket <-> ZMQ sockets."""
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
Min RK
forward-port draft76 websockets...
r18737 import os
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 import json
MinRK
support buffers in comm messages...
r18329 import struct
Min RK
forward-port draft76 websockets...
r18737 import warnings
Jason Grout
Add check to skip work in versions past 3.4
r20456 import sys
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021
Thomas Kluyver
Update imports for Python 3...
r13354 try:
Kyle Kelley
Indicate Py3 vs. Py2 codepath.
r14652 from urllib.parse import urlparse # Py 3
Kyle Kelley
Add Origin Checking.
r14646 except ImportError:
Kyle Kelley
Indicate Py3 vs. Py2 codepath.
r14652 from urlparse import urlparse # Py 2
Kyle Kelley
Add Origin Checking.
r14646
MinRK
make CORS configurable...
r17106 import tornado
Min RK
forward-port draft76 websockets...
r18737 from tornado import gen, ioloop, web
from tornado.websocket import WebSocketHandler
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
from IPython.kernel.zmq.session import Session
MinRK
test websocket-friendly binary message roundtrip...
r18335 from IPython.utils.jsonutil import date_default, extract_dates
MinRK
add websocket workarounds for tornado 3...
r18498 from IPython.utils.py3compat import cast_unicode
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
Brian E. Granger
Updating imports.
r10667 from .handlers import IPythonHandler
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
MinRK
support buffers in comm messages...
r18329 def serialize_binary_message(msg):
"""serialize a message as a binary blob
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
The message serialized to bytes.
"""
MinRK
test websocket-friendly binary message roundtrip...
r18335 # don't modify msg or buffer list in-place
msg = msg.copy()
buffers = list(msg.pop('buffers'))
Jason Grout
Add check to skip work in versions past 3.4
r20456 if sys.version_info < (3, 4):
buffers = [x.tobytes() for x in buffers]
MinRK
support buffers in comm messages...
r18329 bmsg = json.dumps(msg, default=date_default).encode('utf8')
buffers.insert(0, bmsg)
nbufs = len(buffers)
offsets = [4 * (nbufs + 1)]
for buf in buffers[:-1]:
offsets.append(offsets[-1] + len(buf))
MinRK
unsigned ints for offsets...
r18338 offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets)
MinRK
support buffers in comm messages...
r18329 buffers.insert(0, offsets_buf)
return b''.join(buffers)
MinRK
s/unserialize/deserialize
r18330 def deserialize_binary_message(bmsg):
"""deserialize a message from a binary blog
MinRK
support buffers in comm messages...
r18329
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
message dictionary
"""
MinRK
test websocket-friendly binary message roundtrip...
r18335 nbufs = struct.unpack('!i', bmsg[:4])[0]
MinRK
unsigned ints for offsets...
r18338 offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)]))
MinRK
support buffers in comm messages...
r18329 offsets.append(None)
bufs = []
for start, stop in zip(offsets[:-1], offsets[1:]):
bufs.append(bmsg[start:stop])
MinRK
test websocket-friendly binary message roundtrip...
r18335 msg = json.loads(bufs[0].decode('utf8'))
msg['header'] = extract_dates(msg['header'])
msg['parent_header'] = extract_dates(msg['parent_header'])
MinRK
support buffers in comm messages...
r18329 msg['buffers'] = bufs[1:]
return msg
Min RK
forward-port draft76 websockets...
r18737 # ping interval for keeping websockets alive (30 seconds)
WS_PING_INTERVAL = 30000
MinRK
support buffers in comm messages...
r18329
Min RK
forward-port draft76 websockets...
r18737 if os.environ.get('IPYTHON_ALLOW_DRAFT_WEBSOCKETS_FOR_PHANTOMJS', False):
warnings.warn("""Allowing draft76 websocket connections!
This should only be done for testing with phantomjs!""")
from IPython.html import allow76
WebSocketHandler = allow76.AllowDraftWebSocketHandler
# draft 76 doesn't support ping
WS_PING_INTERVAL = 0
class ZMQStreamHandler(WebSocketHandler):
MinRK
make CORS configurable...
r17106
Min RK
backport WebSocket.send_error from tornado 4.1...
r20426 if tornado.version_info < (4,1):
"""Backport send_error from tornado 4.1 to 4.0"""
def send_error(self, *args, **kwargs):
if self.stream is None:
super(WebSocketHandler, self).send_error(*args, **kwargs)
else:
# If we get an uncaught exception during the handshake,
# we have no choice but to abruptly close the connection.
# TODO: for uncaught exceptions after the handshake,
# we can close the connection more gracefully.
self.stream.close()
Min RK
handle message arriving when sockets are closed...
r20427
MinRK
make CORS configurable...
r17106 def check_origin(self, origin):
MinRK
s/cors_/allow_/...
r17116 """Check Origin == Host or Access-Control-Allow-Origin.
Tornado >= 4 calls this method automatically, raising 403 if it returns False.
"""
if self.allow_origin == '*':
MinRK
make CORS configurable...
r17106 return True
Kyle Kelley
Handle variations of name for origin
r14732
Kyle Kelley
Performing check only on open.
r14700 host = self.request.headers.get("Host")
Kyle Kelley
Add Origin Checking.
r14646
Kyle Kelley
Name change to same_origin
r14703 # If no header is provided, assume we can't verify origin
MinRK
better log messages when rejecting cross-origin requests
r17881 if origin is None:
self.log.warn("Missing Origin header, rejecting WebSocket connection.")
return False
if host is None:
self.log.warn("Missing Host header, rejecting WebSocket connection.")
MinRK
make CORS configurable...
r17106 return False
MinRK
only compare host:port in Websocket.check_origin...
r17851 origin = origin.lower()
origin_host = urlparse(origin).netloc
MinRK
make CORS configurable...
r17106
# OK if origin matches host
MinRK
only compare host:port in Websocket.check_origin...
r17851 if origin_host == host:
MinRK
make CORS configurable...
r17106 return True
# Check CORS headers
MinRK
s/cors_/allow_/...
r17116 if self.allow_origin:
MinRK
better log messages when rejecting cross-origin requests
r17881 allow = self.allow_origin == origin
MinRK
s/cors_/allow_/...
r17116 elif self.allow_origin_pat:
MinRK
better log messages when rejecting cross-origin requests
r17881 allow = bool(self.allow_origin_pat.match(origin))
MinRK
make CORS configurable...
r17106 else:
MinRK
s/cors_/allow_/...
r17116 # No CORS headers deny the request
MinRK
better log messages when rejecting cross-origin requests
r17881 allow = False
if not allow:
self.log.warn("Blocking Cross Origin WebSocket Attempt. Origin: %s, Host: %s",
origin, host,
)
return allow
Kyle Kelley
Add Origin Checking.
r14646
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 def clear_cookie(self, *args, **kwargs):
"""meaningless for websockets"""
pass
Min RK
use single WebSocket connection for all channels...
r19824 def _reserialize_reply(self, msg_list, channel=None):
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 """Reserialize a reply message using JSON.
MinRK
s/unserialize/deserialize
r18330 This takes the msg list from the ZMQ socket, deserializes it using
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 self.session and then serializes the result using JSON. This method
should be used by self._on_zmq_reply to build messages that can
be sent back to the browser.
"""
idents, msg_list = self.session.feed_identities(msg_list)
MinRK
s/unserialize/deserialize
r18330 msg = self.session.deserialize(msg_list)
Min RK
use single WebSocket connection for all channels...
r19824 if channel:
msg['channel'] = channel
MinRK
support buffers in comm messages...
r18329 if msg['buffers']:
buf = serialize_binary_message(msg)
return buf
else:
smsg = json.dumps(msg, default=date_default)
return cast_unicode(smsg)
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
Min RK
use single WebSocket connection for all channels...
r19824 def _on_zmq_reply(self, stream, msg_list):
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 # Sometimes this gets triggered when the on_close method is scheduled in the
# eventloop but hasn't been called.
Min RK
handle message arriving when sockets are closed...
r20427 if self.stream.closed() or stream.closed():
self.log.warn("zmq message arrived on closed channel")
self.close()
return
Min RK
use single WebSocket connection for all channels...
r19824 channel = getattr(stream, 'channel', None)
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 try:
Min RK
use single WebSocket connection for all channels...
r19824 msg = self._reserialize_reply(msg_list, channel=channel)
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 except Exception:
self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
else:
MinRK
support buffers in comm messages...
r18329 self.write_message(msg, binary=isinstance(msg, bytes))
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
MinRK
send ping every 30 seconds to keep websockets alive
r17341 ping_callback = None
Richard Everson
Check time of last ping before timing out a missing pong.
r17841 last_ping = 0
MinRK
close websocket connections on ping/pong timeout...
r17635 last_pong = 0
@property
def ping_interval(self):
"""The interval for websocket keep-alive pings.
Set ws_ping_interval = 0 to disable pings.
"""
return self.settings.get('ws_ping_interval', WS_PING_INTERVAL)
@property
def ping_timeout(self):
"""If no ping is received in this many milliseconds,
close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
Default is max of 3 pings or 30 seconds.
"""
return self.settings.get('ws_ping_timeout',
max(3 * self.ping_interval, WS_PING_INTERVAL)
)
MinRK
send ping every 30 seconds to keep websockets alive
r17341
MinRK
make CORS configurable...
r17106 def set_default_headers(self):
"""Undo the set_default_headers in IPythonHandler
which doesn't make sense for websockets
"""
pass
MinRK
remove on_first_message authentication...
r18277
Min RK
debugging websocket connections...
r18522 def pre_get(self):
"""Run before finishing the GET request
Extend this method to add logic that should fire before
the websocket finishes completing.
"""
MinRK
remove on_first_message authentication...
r18277 # authenticate the request before opening the websocket
if self.get_current_user() is None:
self.log.warn("Couldn't authenticate WebSocket connection")
raise web.HTTPError(403)
MinRK
allow session_id to be undefined when starting kernel channels
r18307 if self.get_argument('session_id', False):
MinRK
remove on_first_message authentication...
r18277 self.session.session = cast_unicode(self.get_argument('session_id'))
else:
self.log.warn("No session ID specified")
Min RK
debugging websocket connections...
r18522
@gen.coroutine
def get(self, *args, **kwargs):
# pre_get can be a coroutine in subclasses
Matthias Bussonnier
Get pre_get to work and make session logs when adapter changes
r18561 # assign and yield in two step to avoid tornado 3 issues
res = self.pre_get()
yield gen.maybe_future(res)
Min RK
bump minimum tornado version to 4.0...
r18739 super(AuthenticatedZMQStreamHandler, self).get(*args, **kwargs)
MinRK
remove on_first_message authentication...
r18277
def initialize(self):
Min RK
debugging websocket connections...
r18522 self.log.debug("Initializing websocket connection %s", self.request.path)
MinRK
don't use parent=self in handlers...
r11105 self.session = Session(config=self.config)
MinRK
remove on_first_message authentication...
r18277
MinRK
cache kernel_info reply for protocol adaptation...
r18497 def open(self, *args, **kwargs):
Min RK
debugging websocket connections...
r18522 self.log.debug("Opening websocket %s", self.request.path)
MinRK
close websocket connections on ping/pong timeout...
r17635
# start the pinging
if self.ping_interval > 0:
MinRK
use IOLoop.current in a few places...
r19347 loop = ioloop.IOLoop.current()
self.last_ping = loop.time() # Remember time of last ping
Richard Everson
Check time of last ping before timing out a missing pong.
r17841 self.last_pong = self.last_ping
MinRK
use IOLoop.current in a few places...
r19347 self.ping_callback = ioloop.PeriodicCallback(
self.send_ping, self.ping_interval, io_loop=loop,
)
MinRK
close websocket connections on ping/pong timeout...
r17635 self.ping_callback.start()
MinRK
send ping every 30 seconds to keep websockets alive
r17341
def send_ping(self):
"""send a ping to keep the websocket alive"""
if self.stream.closed() and self.ping_callback is not None:
self.ping_callback.stop()
return
MinRK
close websocket connections on ping/pong timeout...
r17635
Richard Everson
Check time of last ping before timing out a missing pong.
r17841 # check for timeout on pong. Make sure that we really have sent a recent ping in
# case the machine with both server and client has been suspended since the last ping.
MinRK
use IOLoop.current in a few places...
r19347 now = ioloop.IOLoop.current().time()
Richard Everson
Check time of last ping before timing out a missing pong.
r17841 since_last_pong = 1e3 * (now - self.last_pong)
since_last_ping = 1e3 * (now - self.last_ping)
if since_last_ping < 2*self.ping_interval and since_last_pong > self.ping_timeout:
MinRK
close websocket connections on ping/pong timeout...
r17635 self.log.warn("WebSocket ping timeout after %i ms.", since_last_pong)
self.close()
return
MinRK
send ping every 30 seconds to keep websockets alive
r17341
self.ping(b'')
Richard Everson
Check time of last ping before timing out a missing pong.
r17841 self.last_ping = now
MinRK
close websocket connections on ping/pong timeout...
r17635 def on_pong(self, data):
MinRK
use IOLoop.current in a few places...
r19347 self.last_pong = ioloop.IOLoop.current().time()