##// END OF EJS Templates
Check time of last ping before timing out a missing pong.
Richard Everson -
Show More
@@ -1,199 +1,205 b''
1 """Tornado handlers for WebSocket <-> ZMQ sockets."""
1 """Tornado handlers for WebSocket <-> ZMQ sockets."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import json
6 import json
7
7
8 try:
8 try:
9 from urllib.parse import urlparse # Py 3
9 from urllib.parse import urlparse # Py 3
10 except ImportError:
10 except ImportError:
11 from urlparse import urlparse # Py 2
11 from urlparse import urlparse # Py 2
12
12
13 try:
13 try:
14 from http.cookies import SimpleCookie # Py 3
14 from http.cookies import SimpleCookie # Py 3
15 except ImportError:
15 except ImportError:
16 from Cookie import SimpleCookie # Py 2
16 from Cookie import SimpleCookie # Py 2
17 import logging
17 import logging
18
18
19 import tornado
19 import tornado
20 from tornado import ioloop
20 from tornado import ioloop
21 from tornado import web
21 from tornado import web
22 from tornado import websocket
22 from tornado import websocket
23
23
24 from IPython.kernel.zmq.session import Session
24 from IPython.kernel.zmq.session import Session
25 from IPython.utils.jsonutil import date_default
25 from IPython.utils.jsonutil import date_default
26 from IPython.utils.py3compat import PY3, cast_unicode
26 from IPython.utils.py3compat import PY3, cast_unicode
27
27
28 from .handlers import IPythonHandler
28 from .handlers import IPythonHandler
29
29
30
30
31 class ZMQStreamHandler(websocket.WebSocketHandler):
31 class ZMQStreamHandler(websocket.WebSocketHandler):
32
32
33 def check_origin(self, origin):
33 def check_origin(self, origin):
34 """Check Origin == Host or Access-Control-Allow-Origin.
34 """Check Origin == Host or Access-Control-Allow-Origin.
35
35
36 Tornado >= 4 calls this method automatically, raising 403 if it returns False.
36 Tornado >= 4 calls this method automatically, raising 403 if it returns False.
37 We call it explicitly in `open` on Tornado < 4.
37 We call it explicitly in `open` on Tornado < 4.
38 """
38 """
39 if self.allow_origin == '*':
39 if self.allow_origin == '*':
40 return True
40 return True
41
41
42 host = self.request.headers.get("Host")
42 host = self.request.headers.get("Host")
43
43
44 # If no header is provided, assume we can't verify origin
44 # If no header is provided, assume we can't verify origin
45 if(origin is None or host is None):
45 if(origin is None or host is None):
46 return False
46 return False
47
47
48 host_origin = "{0}://{1}".format(self.request.protocol, host)
48 host_origin = "{0}://{1}".format(self.request.protocol, host)
49
49
50 # OK if origin matches host
50 # OK if origin matches host
51 if origin == host_origin:
51 if origin == host_origin:
52 return True
52 return True
53
53
54 # Check CORS headers
54 # Check CORS headers
55 if self.allow_origin:
55 if self.allow_origin:
56 return self.allow_origin == origin
56 return self.allow_origin == origin
57 elif self.allow_origin_pat:
57 elif self.allow_origin_pat:
58 return bool(self.allow_origin_pat.match(origin))
58 return bool(self.allow_origin_pat.match(origin))
59 else:
59 else:
60 # No CORS headers deny the request
60 # No CORS headers deny the request
61 return False
61 return False
62
62
63 def clear_cookie(self, *args, **kwargs):
63 def clear_cookie(self, *args, **kwargs):
64 """meaningless for websockets"""
64 """meaningless for websockets"""
65 pass
65 pass
66
66
67 def _reserialize_reply(self, msg_list):
67 def _reserialize_reply(self, msg_list):
68 """Reserialize a reply message using JSON.
68 """Reserialize a reply message using JSON.
69
69
70 This takes the msg list from the ZMQ socket, unserializes it using
70 This takes the msg list from the ZMQ socket, unserializes it using
71 self.session and then serializes the result using JSON. This method
71 self.session and then serializes the result using JSON. This method
72 should be used by self._on_zmq_reply to build messages that can
72 should be used by self._on_zmq_reply to build messages that can
73 be sent back to the browser.
73 be sent back to the browser.
74 """
74 """
75 idents, msg_list = self.session.feed_identities(msg_list)
75 idents, msg_list = self.session.feed_identities(msg_list)
76 msg = self.session.unserialize(msg_list)
76 msg = self.session.unserialize(msg_list)
77 try:
77 try:
78 msg['header'].pop('date')
78 msg['header'].pop('date')
79 except KeyError:
79 except KeyError:
80 pass
80 pass
81 try:
81 try:
82 msg['parent_header'].pop('date')
82 msg['parent_header'].pop('date')
83 except KeyError:
83 except KeyError:
84 pass
84 pass
85 msg.pop('buffers')
85 msg.pop('buffers')
86 return json.dumps(msg, default=date_default)
86 return json.dumps(msg, default=date_default)
87
87
88 def _on_zmq_reply(self, msg_list):
88 def _on_zmq_reply(self, msg_list):
89 # Sometimes this gets triggered when the on_close method is scheduled in the
89 # Sometimes this gets triggered when the on_close method is scheduled in the
90 # eventloop but hasn't been called.
90 # eventloop but hasn't been called.
91 if self.stream.closed(): return
91 if self.stream.closed(): return
92 try:
92 try:
93 msg = self._reserialize_reply(msg_list)
93 msg = self._reserialize_reply(msg_list)
94 except Exception:
94 except Exception:
95 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
95 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
96 else:
96 else:
97 self.write_message(msg)
97 self.write_message(msg)
98
98
99 def allow_draft76(self):
99 def allow_draft76(self):
100 """Allow draft 76, until browsers such as Safari update to RFC 6455.
100 """Allow draft 76, until browsers such as Safari update to RFC 6455.
101
101
102 This has been disabled by default in tornado in release 2.2.0, and
102 This has been disabled by default in tornado in release 2.2.0, and
103 support will be removed in later versions.
103 support will be removed in later versions.
104 """
104 """
105 return True
105 return True
106
106
107 # ping interval for keeping websockets alive (30 seconds)
107 # ping interval for keeping websockets alive (30 seconds)
108 WS_PING_INTERVAL = 30000
108 WS_PING_INTERVAL = 30000
109
109
110 class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
110 class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
111 ping_callback = None
111 ping_callback = None
112 last_ping = 0
112 last_pong = 0
113 last_pong = 0
113
114
114 @property
115 @property
115 def ping_interval(self):
116 def ping_interval(self):
116 """The interval for websocket keep-alive pings.
117 """The interval for websocket keep-alive pings.
117
118
118 Set ws_ping_interval = 0 to disable pings.
119 Set ws_ping_interval = 0 to disable pings.
119 """
120 """
120 return self.settings.get('ws_ping_interval', WS_PING_INTERVAL)
121 return self.settings.get('ws_ping_interval', WS_PING_INTERVAL)
121
122
122 @property
123 @property
123 def ping_timeout(self):
124 def ping_timeout(self):
124 """If no ping is received in this many milliseconds,
125 """If no ping is received in this many milliseconds,
125 close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
126 close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
126 Default is max of 3 pings or 30 seconds.
127 Default is max of 3 pings or 30 seconds.
127 """
128 """
128 return self.settings.get('ws_ping_timeout',
129 return self.settings.get('ws_ping_timeout',
129 max(3 * self.ping_interval, WS_PING_INTERVAL)
130 max(3 * self.ping_interval, WS_PING_INTERVAL)
130 )
131 )
131
132
132 def set_default_headers(self):
133 def set_default_headers(self):
133 """Undo the set_default_headers in IPythonHandler
134 """Undo the set_default_headers in IPythonHandler
134
135
135 which doesn't make sense for websockets
136 which doesn't make sense for websockets
136 """
137 """
137 pass
138 pass
138
139
139 def open(self, kernel_id):
140 def open(self, kernel_id):
140 self.kernel_id = cast_unicode(kernel_id, 'ascii')
141 self.kernel_id = cast_unicode(kernel_id, 'ascii')
141 # Check to see that origin matches host directly, including ports
142 # Check to see that origin matches host directly, including ports
142 # Tornado 4 already does CORS checking
143 # Tornado 4 already does CORS checking
143 if tornado.version_info[0] < 4:
144 if tornado.version_info[0] < 4:
144 if not self.check_origin(self.get_origin()):
145 if not self.check_origin(self.get_origin()):
145 self.log.warn("Cross Origin WebSocket Attempt from %s", self.get_origin())
146 self.log.warn("Cross Origin WebSocket Attempt from %s", self.get_origin())
146 raise web.HTTPError(403)
147 raise web.HTTPError(403)
147
148
148 self.session = Session(config=self.config)
149 self.session = Session(config=self.config)
149 self.save_on_message = self.on_message
150 self.save_on_message = self.on_message
150 self.on_message = self.on_first_message
151 self.on_message = self.on_first_message
151
152
152 # start the pinging
153 # start the pinging
153 if self.ping_interval > 0:
154 if self.ping_interval > 0:
154 self.last_pong = ioloop.IOLoop.instance().time()
155 self.last_ping = ioloop.IOLoop.instance().time() # Remember time of last ping
156 self.last_pong = self.last_ping
155 self.ping_callback = ioloop.PeriodicCallback(self.send_ping, self.ping_interval)
157 self.ping_callback = ioloop.PeriodicCallback(self.send_ping, self.ping_interval)
156 self.ping_callback.start()
158 self.ping_callback.start()
157
159
158 def send_ping(self):
160 def send_ping(self):
159 """send a ping to keep the websocket alive"""
161 """send a ping to keep the websocket alive"""
160 if self.stream.closed() and self.ping_callback is not None:
162 if self.stream.closed() and self.ping_callback is not None:
161 self.ping_callback.stop()
163 self.ping_callback.stop()
162 return
164 return
163
165
164 # check for timeout on pong
166 # check for timeout on pong. Make sure that we really have sent a recent ping in
165 since_last_pong = 1e3 * (ioloop.IOLoop.instance().time() - self.last_pong)
167 # case the machine with both server and client has been suspended since the last ping.
166 if since_last_pong > self.ping_timeout:
168 now = ioloop.IOLoop.instance().time()
169 since_last_pong = 1e3 * (now - self.last_pong)
170 since_last_ping = 1e3 * (now - self.last_ping)
171 if since_last_ping < 2*self.ping_interval and since_last_pong > self.ping_timeout:
167 self.log.warn("WebSocket ping timeout after %i ms.", since_last_pong)
172 self.log.warn("WebSocket ping timeout after %i ms.", since_last_pong)
168 self.close()
173 self.close()
169 return
174 return
170
175
171 self.ping(b'')
176 self.ping(b'')
177 self.last_ping = now
172
178
173 def on_pong(self, data):
179 def on_pong(self, data):
174 self.last_pong = ioloop.IOLoop.instance().time()
180 self.last_pong = ioloop.IOLoop.instance().time()
175
181
176 def _inject_cookie_message(self, msg):
182 def _inject_cookie_message(self, msg):
177 """Inject the first message, which is the document cookie,
183 """Inject the first message, which is the document cookie,
178 for authentication."""
184 for authentication."""
179 if not PY3 and isinstance(msg, unicode):
185 if not PY3 and isinstance(msg, unicode):
180 # Cookie constructor doesn't accept unicode strings
186 # Cookie constructor doesn't accept unicode strings
181 # under Python 2.x for some reason
187 # under Python 2.x for some reason
182 msg = msg.encode('utf8', 'replace')
188 msg = msg.encode('utf8', 'replace')
183 try:
189 try:
184 identity, msg = msg.split(':', 1)
190 identity, msg = msg.split(':', 1)
185 self.session.session = cast_unicode(identity, 'ascii')
191 self.session.session = cast_unicode(identity, 'ascii')
186 except Exception:
192 except Exception:
187 logging.error("First ws message didn't have the form 'identity:[cookie]' - %r", msg)
193 logging.error("First ws message didn't have the form 'identity:[cookie]' - %r", msg)
188
194
189 try:
195 try:
190 self.request._cookies = SimpleCookie(msg)
196 self.request._cookies = SimpleCookie(msg)
191 except:
197 except:
192 self.log.warn("couldn't parse cookie string: %s",msg, exc_info=True)
198 self.log.warn("couldn't parse cookie string: %s",msg, exc_info=True)
193
199
194 def on_first_message(self, msg):
200 def on_first_message(self, msg):
195 self._inject_cookie_message(msg)
201 self._inject_cookie_message(msg)
196 if self.get_current_user() is None:
202 if self.get_current_user() is None:
197 self.log.warn("Couldn't authenticate WebSocket connection")
203 self.log.warn("Couldn't authenticate WebSocket connection")
198 raise web.HTTPError(403)
204 raise web.HTTPError(403)
199 self.on_message = self.save_on_message
205 self.on_message = self.save_on_message
General Comments 0
You need to be logged in to leave comments. Login now