##// END OF EJS Templates
3.2.0-maint
3.2.0-maint

File last commit:

r20427:6631fad7
r21129:27f1a73f
Show More
zmqhandlers.py
278 lines | 9.4 KiB | text/x-python | PythonLexer
MinRK
add websocket workarounds for tornado 3...
r18498 # coding: utf-8
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 """Tornado handlers for WebSocket <-> ZMQ sockets."""
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
Min RK
forward-port draft76 websockets...
r18737 import os
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021 import json
MinRK
support buffers in comm messages...
r18329 import struct
Min RK
forward-port draft76 websockets...
r18737 import warnings
MinRK
only use zmq.jsonapi when talking to zmq sockets...
r17021
Thomas Kluyver
Update imports for Python 3...
r13354 try:
Kyle Kelley
Indicate Py3 vs. Py2 codepath.
r14652 from urllib.parse import urlparse # Py 3
Kyle Kelley
Add Origin Checking.
r14646 except ImportError:
Kyle Kelley
Indicate Py3 vs. Py2 codepath.
r14652 from urlparse import urlparse # Py 2
Kyle Kelley
Add Origin Checking.
r14646
MinRK
make CORS configurable...
r17106 import tornado
Min RK
forward-port draft76 websockets...
r18737 from tornado import gen, ioloop, web
from tornado.websocket import WebSocketHandler
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
from IPython.kernel.zmq.session import Session
MinRK
test websocket-friendly binary message roundtrip...
r18335 from IPython.utils.jsonutil import date_default, extract_dates
MinRK
add websocket workarounds for tornado 3...
r18498 from IPython.utils.py3compat import cast_unicode
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
Brian E. Granger
Updating imports.
r10667 from .handlers import IPythonHandler
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
MinRK
support buffers in comm messages...
r18329 def serialize_binary_message(msg):
"""serialize a message as a binary blob
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
The message serialized to bytes.
"""
MinRK
test websocket-friendly binary message roundtrip...
r18335 # don't modify msg or buffer list in-place
msg = msg.copy()
buffers = list(msg.pop('buffers'))
MinRK
support buffers in comm messages...
r18329 bmsg = json.dumps(msg, default=date_default).encode('utf8')
buffers.insert(0, bmsg)
nbufs = len(buffers)
offsets = [4 * (nbufs + 1)]
for buf in buffers[:-1]:
offsets.append(offsets[-1] + len(buf))
MinRK
unsigned ints for offsets...
r18338 offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets)
MinRK
support buffers in comm messages...
r18329 buffers.insert(0, offsets_buf)
return b''.join(buffers)
MinRK
s/unserialize/deserialize
r18330 def deserialize_binary_message(bmsg):
"""deserialize a message from a binary blog
MinRK
support buffers in comm messages...
r18329
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
message dictionary
"""
MinRK
test websocket-friendly binary message roundtrip...
r18335 nbufs = struct.unpack('!i', bmsg[:4])[0]
MinRK
unsigned ints for offsets...
r18338 offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)]))
MinRK
support buffers in comm messages...
r18329 offsets.append(None)
bufs = []
for start, stop in zip(offsets[:-1], offsets[1:]):
bufs.append(bmsg[start:stop])
MinRK
test websocket-friendly binary message roundtrip...
r18335 msg = json.loads(bufs[0].decode('utf8'))
msg['header'] = extract_dates(msg['header'])
msg['parent_header'] = extract_dates(msg['parent_header'])
MinRK
support buffers in comm messages...
r18329 msg['buffers'] = bufs[1:]
return msg
Min RK
forward-port draft76 websockets...
r18737 # ping interval for keeping websockets alive (30 seconds)
WS_PING_INTERVAL = 30000
MinRK
support buffers in comm messages...
r18329
Min RK
forward-port draft76 websockets...
r18737 if os.environ.get('IPYTHON_ALLOW_DRAFT_WEBSOCKETS_FOR_PHANTOMJS', False):
warnings.warn("""Allowing draft76 websocket connections!
This should only be done for testing with phantomjs!""")
from IPython.html import allow76
WebSocketHandler = allow76.AllowDraftWebSocketHandler
# draft 76 doesn't support ping
WS_PING_INTERVAL = 0
class ZMQStreamHandler(WebSocketHandler):
MinRK
make CORS configurable...
r17106
Min RK
backport WebSocket.send_error from tornado 4.1...
r20426 if tornado.version_info < (4,1):
"""Backport send_error from tornado 4.1 to 4.0"""
def send_error(self, *args, **kwargs):
if self.stream is None:
super(WebSocketHandler, self).send_error(*args, **kwargs)
else:
# If we get an uncaught exception during the handshake,
# we have no choice but to abruptly close the connection.
# TODO: for uncaught exceptions after the handshake,
# we can close the connection more gracefully.
self.stream.close()
Min RK
handle message arriving when sockets are closed...
r20427
MinRK
make CORS configurable...
r17106 def check_origin(self, origin):
MinRK
s/cors_/allow_/...
r17116 """Check Origin == Host or Access-Control-Allow-Origin.
Tornado >= 4 calls this method automatically, raising 403 if it returns False.
"""
if self.allow_origin == '*':
MinRK
make CORS configurable...
r17106 return True
Kyle Kelley
Handle variations of name for origin
r14732
Kyle Kelley
Performing check only on open.
r14700 host = self.request.headers.get("Host")
Kyle Kelley
Add Origin Checking.
r14646
Kyle Kelley
Name change to same_origin
r14703 # If no header is provided, assume we can't verify origin
MinRK
better log messages when rejecting cross-origin requests
r17881 if origin is None:
self.log.warn("Missing Origin header, rejecting WebSocket connection.")
return False
if host is None:
self.log.warn("Missing Host header, rejecting WebSocket connection.")
MinRK
make CORS configurable...
r17106 return False
MinRK
only compare host:port in Websocket.check_origin...
r17851 origin = origin.lower()
origin_host = urlparse(origin).netloc
MinRK
make CORS configurable...
r17106
# OK if origin matches host
MinRK
only compare host:port in Websocket.check_origin...
r17851 if origin_host == host:
MinRK
make CORS configurable...
r17106 return True
# Check CORS headers
MinRK
s/cors_/allow_/...
r17116 if self.allow_origin:
MinRK
better log messages when rejecting cross-origin requests
r17881 allow = self.allow_origin == origin
MinRK
s/cors_/allow_/...
r17116 elif self.allow_origin_pat:
MinRK
better log messages when rejecting cross-origin requests
r17881 allow = bool(self.allow_origin_pat.match(origin))
MinRK
make CORS configurable...
r17106 else:
MinRK
s/cors_/allow_/...
r17116 # No CORS headers deny the request
MinRK
better log messages when rejecting cross-origin requests
r17881 allow = False
if not allow:
self.log.warn("Blocking Cross Origin WebSocket Attempt. Origin: %s, Host: %s",
origin, host,
)
return allow
Kyle Kelley
Add Origin Checking.
r14646
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 def clear_cookie(self, *args, **kwargs):
"""meaningless for websockets"""
pass
Min RK
use single WebSocket connection for all channels...
r19824 def _reserialize_reply(self, msg_list, channel=None):
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 """Reserialize a reply message using JSON.
MinRK
s/unserialize/deserialize
r18330 This takes the msg list from the ZMQ socket, deserializes it using
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 self.session and then serializes the result using JSON. This method
should be used by self._on_zmq_reply to build messages that can
be sent back to the browser.
"""
idents, msg_list = self.session.feed_identities(msg_list)
MinRK
s/unserialize/deserialize
r18330 msg = self.session.deserialize(msg_list)
Min RK
use single WebSocket connection for all channels...
r19824 if channel:
msg['channel'] = channel
MinRK
support buffers in comm messages...
r18329 if msg['buffers']:
buf = serialize_binary_message(msg)
return buf
else:
smsg = json.dumps(msg, default=date_default)
return cast_unicode(smsg)
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
Min RK
use single WebSocket connection for all channels...
r19824 def _on_zmq_reply(self, stream, msg_list):
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 # Sometimes this gets triggered when the on_close method is scheduled in the
# eventloop but hasn't been called.
Min RK
handle message arriving when sockets are closed...
r20427 if self.stream.closed() or stream.closed():
self.log.warn("zmq message arrived on closed channel")
self.close()
return
Min RK
use single WebSocket connection for all channels...
r19824 channel = getattr(stream, 'channel', None)
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 try:
Min RK
use single WebSocket connection for all channels...
r19824 msg = self._reserialize_reply(msg_list, channel=channel)
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653 except Exception:
self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
else:
MinRK
support buffers in comm messages...
r18329 self.write_message(msg, binary=isinstance(msg, bytes))
Brian E. Granger
Moving base ZMQ handlers to base/zmqhandlers.py.
r10653
class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
MinRK
send ping every 30 seconds to keep websockets alive
r17341 ping_callback = None
Richard Everson
Check time of last ping before timing out a missing pong.
r17841 last_ping = 0
MinRK
close websocket connections on ping/pong timeout...
r17635 last_pong = 0
@property
def ping_interval(self):
"""The interval for websocket keep-alive pings.
Set ws_ping_interval = 0 to disable pings.
"""
return self.settings.get('ws_ping_interval', WS_PING_INTERVAL)
@property
def ping_timeout(self):
"""If no ping is received in this many milliseconds,
close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
Default is max of 3 pings or 30 seconds.
"""
return self.settings.get('ws_ping_timeout',
max(3 * self.ping_interval, WS_PING_INTERVAL)
)
MinRK
send ping every 30 seconds to keep websockets alive
r17341
MinRK
make CORS configurable...
r17106 def set_default_headers(self):
"""Undo the set_default_headers in IPythonHandler
which doesn't make sense for websockets
"""
pass
MinRK
remove on_first_message authentication...
r18277
Min RK
debugging websocket connections...
r18522 def pre_get(self):
"""Run before finishing the GET request
Extend this method to add logic that should fire before
the websocket finishes completing.
"""
MinRK
remove on_first_message authentication...
r18277 # authenticate the request before opening the websocket
if self.get_current_user() is None:
self.log.warn("Couldn't authenticate WebSocket connection")
raise web.HTTPError(403)
MinRK
allow session_id to be undefined when starting kernel channels
r18307 if self.get_argument('session_id', False):
MinRK
remove on_first_message authentication...
r18277 self.session.session = cast_unicode(self.get_argument('session_id'))
else:
self.log.warn("No session ID specified")
Min RK
debugging websocket connections...
r18522
@gen.coroutine
def get(self, *args, **kwargs):
# pre_get can be a coroutine in subclasses
Matthias Bussonnier
Get pre_get to work and make session logs when adapter changes
r18561 # assign and yield in two step to avoid tornado 3 issues
res = self.pre_get()
yield gen.maybe_future(res)
Min RK
bump minimum tornado version to 4.0...
r18739 super(AuthenticatedZMQStreamHandler, self).get(*args, **kwargs)
MinRK
remove on_first_message authentication...
r18277
def initialize(self):
Min RK
debugging websocket connections...
r18522 self.log.debug("Initializing websocket connection %s", self.request.path)
MinRK
don't use parent=self in handlers...
r11105 self.session = Session(config=self.config)
MinRK
remove on_first_message authentication...
r18277
MinRK
cache kernel_info reply for protocol adaptation...
r18497 def open(self, *args, **kwargs):
Min RK
debugging websocket connections...
r18522 self.log.debug("Opening websocket %s", self.request.path)
MinRK
close websocket connections on ping/pong timeout...
r17635
# start the pinging
if self.ping_interval > 0:
MinRK
use IOLoop.current in a few places...
r19347 loop = ioloop.IOLoop.current()
self.last_ping = loop.time() # Remember time of last ping
Richard Everson
Check time of last ping before timing out a missing pong.
r17841 self.last_pong = self.last_ping
MinRK
use IOLoop.current in a few places...
r19347 self.ping_callback = ioloop.PeriodicCallback(
self.send_ping, self.ping_interval, io_loop=loop,
)
MinRK
close websocket connections on ping/pong timeout...
r17635 self.ping_callback.start()
MinRK
send ping every 30 seconds to keep websockets alive
r17341
def send_ping(self):
"""send a ping to keep the websocket alive"""
if self.stream.closed() and self.ping_callback is not None:
self.ping_callback.stop()
return
MinRK
close websocket connections on ping/pong timeout...
r17635
Richard Everson
Check time of last ping before timing out a missing pong.
r17841 # check for timeout on pong. Make sure that we really have sent a recent ping in
# case the machine with both server and client has been suspended since the last ping.
MinRK
use IOLoop.current in a few places...
r19347 now = ioloop.IOLoop.current().time()
Richard Everson
Check time of last ping before timing out a missing pong.
r17841 since_last_pong = 1e3 * (now - self.last_pong)
since_last_ping = 1e3 * (now - self.last_ping)
if since_last_ping < 2*self.ping_interval and since_last_pong > self.ping_timeout:
MinRK
close websocket connections on ping/pong timeout...
r17635 self.log.warn("WebSocket ping timeout after %i ms.", since_last_pong)
self.close()
return
MinRK
send ping every 30 seconds to keep websockets alive
r17341
self.ping(b'')
Richard Everson
Check time of last ping before timing out a missing pong.
r17841 self.last_ping = now
MinRK
close websocket connections on ping/pong timeout...
r17635 def on_pong(self, data):
MinRK
use IOLoop.current in a few places...
r19347 self.last_pong = ioloop.IOLoop.current().time()