##// END OF EJS Templates
Add check to skip work in versions past 3.4
Jason Grout -
Show More
@@ -1,280 +1,281 b''
1 1 # coding: utf-8
2 2 """Tornado handlers for WebSocket <-> ZMQ sockets."""
3 3
4 4 # Copyright (c) IPython Development Team.
5 5 # Distributed under the terms of the Modified BSD License.
6 6
7 7 import os
8 8 import json
9 9 import struct
10 10 import warnings
11 import sys
11 12
12 13 try:
13 14 from urllib.parse import urlparse # Py 3
14 15 except ImportError:
15 16 from urlparse import urlparse # Py 2
16 17
17 18 import tornado
18 19 from tornado import gen, ioloop, web
19 20 from tornado.websocket import WebSocketHandler
20 21
21 22 from IPython.kernel.zmq.session import Session
22 23 from IPython.utils.jsonutil import date_default, extract_dates
23 24 from IPython.utils.py3compat import cast_unicode
24 25
25 26 from .handlers import IPythonHandler
26 27
27 28 def serialize_binary_message(msg):
28 29 """serialize a message as a binary blob
29 30
30 31 Header:
31 32
32 33 4 bytes: number of msg parts (nbufs) as 32b int
33 34 4 * nbufs bytes: offset for each buffer as integer as 32b int
34 35
35 36 Offsets are from the start of the buffer, including the header.
36 37
37 38 Returns
38 39 -------
39 40
40 41 The message serialized to bytes.
41 42
42 43 """
43 44 # don't modify msg or buffer list in-place
44 45 msg = msg.copy()
45 46 buffers = list(msg.pop('buffers'))
46 # for python 2, copy the buffer memoryviews to byte strings
47 buffers = [x.tobytes() for x in buffers]
47 if sys.version_info < (3, 4):
48 buffers = [x.tobytes() for x in buffers]
48 49 bmsg = json.dumps(msg, default=date_default).encode('utf8')
49 50 buffers.insert(0, bmsg)
50 51 nbufs = len(buffers)
51 52 offsets = [4 * (nbufs + 1)]
52 53 for buf in buffers[:-1]:
53 54 offsets.append(offsets[-1] + len(buf))
54 55 offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets)
55 56 buffers.insert(0, offsets_buf)
56 57 return b''.join(buffers)
57 58
58 59
59 60 def deserialize_binary_message(bmsg):
60 61 """deserialize a message from a binary blog
61 62
62 63 Header:
63 64
64 65 4 bytes: number of msg parts (nbufs) as 32b int
65 66 4 * nbufs bytes: offset for each buffer as integer as 32b int
66 67
67 68 Offsets are from the start of the buffer, including the header.
68 69
69 70 Returns
70 71 -------
71 72
72 73 message dictionary
73 74 """
74 75 nbufs = struct.unpack('!i', bmsg[:4])[0]
75 76 offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)]))
76 77 offsets.append(None)
77 78 bufs = []
78 79 for start, stop in zip(offsets[:-1], offsets[1:]):
79 80 bufs.append(bmsg[start:stop])
80 81 msg = json.loads(bufs[0].decode('utf8'))
81 82 msg['header'] = extract_dates(msg['header'])
82 83 msg['parent_header'] = extract_dates(msg['parent_header'])
83 84 msg['buffers'] = bufs[1:]
84 85 return msg
85 86
86 87 # ping interval for keeping websockets alive (30 seconds)
87 88 WS_PING_INTERVAL = 30000
88 89
89 90 if os.environ.get('IPYTHON_ALLOW_DRAFT_WEBSOCKETS_FOR_PHANTOMJS', False):
90 91 warnings.warn("""Allowing draft76 websocket connections!
91 92 This should only be done for testing with phantomjs!""")
92 93 from IPython.html import allow76
93 94 WebSocketHandler = allow76.AllowDraftWebSocketHandler
94 95 # draft 76 doesn't support ping
95 96 WS_PING_INTERVAL = 0
96 97
97 98 class ZMQStreamHandler(WebSocketHandler):
98 99
99 100 if tornado.version_info < (4,1):
100 101 """Backport send_error from tornado 4.1 to 4.0"""
101 102 def send_error(self, *args, **kwargs):
102 103 if self.stream is None:
103 104 super(WebSocketHandler, self).send_error(*args, **kwargs)
104 105 else:
105 106 # If we get an uncaught exception during the handshake,
106 107 # we have no choice but to abruptly close the connection.
107 108 # TODO: for uncaught exceptions after the handshake,
108 109 # we can close the connection more gracefully.
109 110 self.stream.close()
110 111
111 112
112 113 def check_origin(self, origin):
113 114 """Check Origin == Host or Access-Control-Allow-Origin.
114 115
115 116 Tornado >= 4 calls this method automatically, raising 403 if it returns False.
116 117 """
117 118 if self.allow_origin == '*':
118 119 return True
119 120
120 121 host = self.request.headers.get("Host")
121 122
122 123 # If no header is provided, assume we can't verify origin
123 124 if origin is None:
124 125 self.log.warn("Missing Origin header, rejecting WebSocket connection.")
125 126 return False
126 127 if host is None:
127 128 self.log.warn("Missing Host header, rejecting WebSocket connection.")
128 129 return False
129 130
130 131 origin = origin.lower()
131 132 origin_host = urlparse(origin).netloc
132 133
133 134 # OK if origin matches host
134 135 if origin_host == host:
135 136 return True
136 137
137 138 # Check CORS headers
138 139 if self.allow_origin:
139 140 allow = self.allow_origin == origin
140 141 elif self.allow_origin_pat:
141 142 allow = bool(self.allow_origin_pat.match(origin))
142 143 else:
143 144 # No CORS headers deny the request
144 145 allow = False
145 146 if not allow:
146 147 self.log.warn("Blocking Cross Origin WebSocket Attempt. Origin: %s, Host: %s",
147 148 origin, host,
148 149 )
149 150 return allow
150 151
151 152 def clear_cookie(self, *args, **kwargs):
152 153 """meaningless for websockets"""
153 154 pass
154 155
155 156 def _reserialize_reply(self, msg_list, channel=None):
156 157 """Reserialize a reply message using JSON.
157 158
158 159 This takes the msg list from the ZMQ socket, deserializes it using
159 160 self.session and then serializes the result using JSON. This method
160 161 should be used by self._on_zmq_reply to build messages that can
161 162 be sent back to the browser.
162 163 """
163 164 idents, msg_list = self.session.feed_identities(msg_list)
164 165 msg = self.session.deserialize(msg_list)
165 166 if channel:
166 167 msg['channel'] = channel
167 168 if msg['buffers']:
168 169 buf = serialize_binary_message(msg)
169 170 return buf
170 171 else:
171 172 smsg = json.dumps(msg, default=date_default)
172 173 return cast_unicode(smsg)
173 174
174 175 def _on_zmq_reply(self, stream, msg_list):
175 176 # Sometimes this gets triggered when the on_close method is scheduled in the
176 177 # eventloop but hasn't been called.
177 178 if self.stream.closed() or stream.closed():
178 179 self.log.warn("zmq message arrived on closed channel")
179 180 self.close()
180 181 return
181 182 channel = getattr(stream, 'channel', None)
182 183 try:
183 184 msg = self._reserialize_reply(msg_list, channel=channel)
184 185 except Exception:
185 186 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
186 187 else:
187 188 self.write_message(msg, binary=isinstance(msg, bytes))
188 189
189 190 class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
190 191 ping_callback = None
191 192 last_ping = 0
192 193 last_pong = 0
193 194
194 195 @property
195 196 def ping_interval(self):
196 197 """The interval for websocket keep-alive pings.
197 198
198 199 Set ws_ping_interval = 0 to disable pings.
199 200 """
200 201 return self.settings.get('ws_ping_interval', WS_PING_INTERVAL)
201 202
202 203 @property
203 204 def ping_timeout(self):
204 205 """If no ping is received in this many milliseconds,
205 206 close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
206 207 Default is max of 3 pings or 30 seconds.
207 208 """
208 209 return self.settings.get('ws_ping_timeout',
209 210 max(3 * self.ping_interval, WS_PING_INTERVAL)
210 211 )
211 212
212 213 def set_default_headers(self):
213 214 """Undo the set_default_headers in IPythonHandler
214 215
215 216 which doesn't make sense for websockets
216 217 """
217 218 pass
218 219
219 220 def pre_get(self):
220 221 """Run before finishing the GET request
221 222
222 223 Extend this method to add logic that should fire before
223 224 the websocket finishes completing.
224 225 """
225 226 # authenticate the request before opening the websocket
226 227 if self.get_current_user() is None:
227 228 self.log.warn("Couldn't authenticate WebSocket connection")
228 229 raise web.HTTPError(403)
229 230
230 231 if self.get_argument('session_id', False):
231 232 self.session.session = cast_unicode(self.get_argument('session_id'))
232 233 else:
233 234 self.log.warn("No session ID specified")
234 235
235 236 @gen.coroutine
236 237 def get(self, *args, **kwargs):
237 238 # pre_get can be a coroutine in subclasses
238 239 # assign and yield in two step to avoid tornado 3 issues
239 240 res = self.pre_get()
240 241 yield gen.maybe_future(res)
241 242 super(AuthenticatedZMQStreamHandler, self).get(*args, **kwargs)
242 243
243 244 def initialize(self):
244 245 self.log.debug("Initializing websocket connection %s", self.request.path)
245 246 self.session = Session(config=self.config)
246 247
247 248 def open(self, *args, **kwargs):
248 249 self.log.debug("Opening websocket %s", self.request.path)
249 250
250 251 # start the pinging
251 252 if self.ping_interval > 0:
252 253 loop = ioloop.IOLoop.current()
253 254 self.last_ping = loop.time() # Remember time of last ping
254 255 self.last_pong = self.last_ping
255 256 self.ping_callback = ioloop.PeriodicCallback(
256 257 self.send_ping, self.ping_interval, io_loop=loop,
257 258 )
258 259 self.ping_callback.start()
259 260
260 261 def send_ping(self):
261 262 """send a ping to keep the websocket alive"""
262 263 if self.stream.closed() and self.ping_callback is not None:
263 264 self.ping_callback.stop()
264 265 return
265 266
266 267 # check for timeout on pong. Make sure that we really have sent a recent ping in
267 268 # case the machine with both server and client has been suspended since the last ping.
268 269 now = ioloop.IOLoop.current().time()
269 270 since_last_pong = 1e3 * (now - self.last_pong)
270 271 since_last_ping = 1e3 * (now - self.last_ping)
271 272 if since_last_ping < 2*self.ping_interval and since_last_pong > self.ping_timeout:
272 273 self.log.warn("WebSocket ping timeout after %i ms.", since_last_pong)
273 274 self.close()
274 275 return
275 276
276 277 self.ping(b'')
277 278 self.last_ping = now
278 279
279 280 def on_pong(self, data):
280 281 self.last_pong = ioloop.IOLoop.current().time()
General Comments 0
You need to be logged in to leave comments. Login now