##// END OF EJS Templates
only use zmq.jsonapi when talking to zmq sockets...
MinRK -
Show More
@@ -1,134 +1,134 b''
1 1 """Tornado handlers for WebSocket <-> ZMQ sockets."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 import json
7
6 8 try:
7 9 from urllib.parse import urlparse # Py 3
8 10 except ImportError:
9 11 from urlparse import urlparse # Py 2
10 12
11 13 try:
12 14 from http.cookies import SimpleCookie # Py 3
13 15 except ImportError:
14 16 from Cookie import SimpleCookie # Py 2
15 17 import logging
16 18 from tornado import web
17 19 from tornado import websocket
18 20
19 from zmq.utils import jsonapi
20
21 21 from IPython.kernel.zmq.session import Session
22 22 from IPython.utils.jsonutil import date_default
23 23 from IPython.utils.py3compat import PY3, cast_unicode
24 24
25 25 from .handlers import IPythonHandler
26 26
27 27
28 28 class ZMQStreamHandler(websocket.WebSocketHandler):
29 29
30 30 def same_origin(self):
31 31 """Check to see that origin and host match in the headers."""
32 32
33 33 # The difference between version 8 and 13 is that in 8 the
34 34 # client sends a "Sec-Websocket-Origin" header and in 13 it's
35 35 # simply "Origin".
36 36 if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8"):
37 37 origin_header = self.request.headers.get("Sec-Websocket-Origin")
38 38 else:
39 39 origin_header = self.request.headers.get("Origin")
40 40
41 41 host = self.request.headers.get("Host")
42 42
43 43 # If no header is provided, assume we can't verify origin
44 44 if(origin_header is None or host is None):
45 45 return False
46 46
47 47 parsed_origin = urlparse(origin_header)
48 48 origin = parsed_origin.netloc
49 49
50 50 # Check to see that origin matches host directly, including ports
51 51 return origin == host
52 52
53 53 def clear_cookie(self, *args, **kwargs):
54 54 """meaningless for websockets"""
55 55 pass
56 56
57 57 def _reserialize_reply(self, msg_list):
58 58 """Reserialize a reply message using JSON.
59 59
60 60 This takes the msg list from the ZMQ socket, unserializes it using
61 61 self.session and then serializes the result using JSON. This method
62 62 should be used by self._on_zmq_reply to build messages that can
63 63 be sent back to the browser.
64 64 """
65 65 idents, msg_list = self.session.feed_identities(msg_list)
66 66 msg = self.session.unserialize(msg_list)
67 67 try:
68 68 msg['header'].pop('date')
69 69 except KeyError:
70 70 pass
71 71 try:
72 72 msg['parent_header'].pop('date')
73 73 except KeyError:
74 74 pass
75 75 msg.pop('buffers')
76 return jsonapi.dumps(msg, default=date_default)
76 return json.dumps(msg, default=date_default)
77 77
78 78 def _on_zmq_reply(self, msg_list):
79 79 # Sometimes this gets triggered when the on_close method is scheduled in the
80 80 # eventloop but hasn't been called.
81 81 if self.stream.closed(): return
82 82 try:
83 83 msg = self._reserialize_reply(msg_list)
84 84 except Exception:
85 85 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
86 86 else:
87 87 self.write_message(msg)
88 88
89 89 def allow_draft76(self):
90 90 """Allow draft 76, until browsers such as Safari update to RFC 6455.
91 91
92 92 This has been disabled by default in tornado in release 2.2.0, and
93 93 support will be removed in later versions.
94 94 """
95 95 return True
96 96
97 97
98 98 class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
99 99
100 100 def open(self, kernel_id):
101 101 self.kernel_id = cast_unicode(kernel_id, 'ascii')
102 102 # Check to see that origin matches host directly, including ports
103 103 if not self.same_origin():
104 104 self.log.warn("Cross Origin WebSocket Attempt.")
105 105 raise web.HTTPError(404)
106 106
107 107 self.session = Session(config=self.config)
108 108 self.save_on_message = self.on_message
109 109 self.on_message = self.on_first_message
110 110
111 111 def _inject_cookie_message(self, msg):
112 112 """Inject the first message, which is the document cookie,
113 113 for authentication."""
114 114 if not PY3 and isinstance(msg, unicode):
115 115 # Cookie constructor doesn't accept unicode strings
116 116 # under Python 2.x for some reason
117 117 msg = msg.encode('utf8', 'replace')
118 118 try:
119 119 identity, msg = msg.split(':', 1)
120 120 self.session.session = cast_unicode(identity, 'ascii')
121 121 except Exception:
122 122 logging.error("First ws message didn't have the form 'identity:[cookie]' - %r", msg)
123 123
124 124 try:
125 125 self.request._cookies = SimpleCookie(msg)
126 126 except:
127 127 self.log.warn("couldn't parse cookie string: %s",msg, exc_info=True)
128 128
129 129 def on_first_message(self, msg):
130 130 self._inject_cookie_message(msg)
131 131 if self.get_current_user() is None:
132 132 self.log.warn("Couldn't authenticate WebSocket connection")
133 133 raise web.HTTPError(403)
134 134 self.on_message = self.save_on_message
@@ -1,72 +1,59 b''
1 """Tornado handlers for cluster web service.
1 """Tornado handlers for cluster web service."""
2 2
3 Authors:
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 5
5 * Brian Granger
6 """
7
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
14
15 #-----------------------------------------------------------------------------
16 # Imports
17 #-----------------------------------------------------------------------------
6 import json
18 7
19 8 from tornado import web
20 9
21 from zmq.utils import jsonapi
22
23 10 from ...base.handlers import IPythonHandler
24 11
25 12 #-----------------------------------------------------------------------------
26 13 # Cluster handlers
27 14 #-----------------------------------------------------------------------------
28 15
29 16
30 17 class MainClusterHandler(IPythonHandler):
31 18
32 19 @web.authenticated
33 20 def get(self):
34 self.finish(jsonapi.dumps(self.cluster_manager.list_profiles()))
21 self.finish(json.dumps(self.cluster_manager.list_profiles()))
35 22
36 23
37 24 class ClusterProfileHandler(IPythonHandler):
38 25
39 26 @web.authenticated
40 27 def get(self, profile):
41 self.finish(jsonapi.dumps(self.cluster_manager.profile_info(profile)))
28 self.finish(json.dumps(self.cluster_manager.profile_info(profile)))
42 29
43 30
44 31 class ClusterActionHandler(IPythonHandler):
45 32
46 33 @web.authenticated
47 34 def post(self, profile, action):
48 35 cm = self.cluster_manager
49 36 if action == 'start':
50 37 n = self.get_argument('n', default=None)
51 38 if not n:
52 39 data = cm.start_cluster(profile)
53 40 else:
54 41 data = cm.start_cluster(profile, int(n))
55 42 if action == 'stop':
56 43 data = cm.stop_cluster(profile)
57 self.finish(jsonapi.dumps(data))
44 self.finish(json.dumps(data))
58 45
59 46
60 47 #-----------------------------------------------------------------------------
61 48 # URL to handler mappings
62 49 #-----------------------------------------------------------------------------
63 50
64 51
65 52 _cluster_action_regex = r"(?P<action>start|stop)"
66 53 _profile_regex = r"(?P<profile>[^\/]+)" # there is almost no text that is invalid
67 54
68 55 default_handlers = [
69 56 (r"/clusters", MainClusterHandler),
70 57 (r"/clusters/%s/%s" % (_profile_regex, _cluster_action_regex), ClusterActionHandler),
71 58 (r"/clusters/%s" % _profile_regex, ClusterProfileHandler),
72 59 ]
@@ -1,218 +1,217 b''
1 1 """Tornado handlers for kernels."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 import json
6 7 import logging
7 8 from tornado import web
8 9
9 from zmq.utils import jsonapi
10
11 10 from IPython.utils.jsonutil import date_default
12 11 from IPython.utils.py3compat import string_types
13 12 from IPython.html.utils import url_path_join, url_escape
14 13
15 14 from ...base.handlers import IPythonHandler, json_errors
16 15 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler
17 16
18 17 from IPython.core.release import kernel_protocol_version
19 18
20 19 class MainKernelHandler(IPythonHandler):
21 20
22 21 @web.authenticated
23 22 @json_errors
24 23 def get(self):
25 24 km = self.kernel_manager
26 self.finish(jsonapi.dumps(km.list_kernels()))
25 self.finish(json.dumps(km.list_kernels()))
27 26
28 27 @web.authenticated
29 28 @json_errors
30 29 def post(self):
31 30 km = self.kernel_manager
32 31 kernel_id = km.start_kernel()
33 32 model = km.kernel_model(kernel_id)
34 33 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
35 34 self.set_header('Location', url_escape(location))
36 35 self.set_status(201)
37 self.finish(jsonapi.dumps(model))
36 self.finish(json.dumps(model))
38 37
39 38
40 39 class KernelHandler(IPythonHandler):
41 40
42 41 SUPPORTED_METHODS = ('DELETE', 'GET')
43 42
44 43 @web.authenticated
45 44 @json_errors
46 45 def get(self, kernel_id):
47 46 km = self.kernel_manager
48 47 km._check_kernel_id(kernel_id)
49 48 model = km.kernel_model(kernel_id)
50 self.finish(jsonapi.dumps(model))
49 self.finish(json.dumps(model))
51 50
52 51 @web.authenticated
53 52 @json_errors
54 53 def delete(self, kernel_id):
55 54 km = self.kernel_manager
56 55 km.shutdown_kernel(kernel_id)
57 56 self.set_status(204)
58 57 self.finish()
59 58
60 59
61 60 class KernelActionHandler(IPythonHandler):
62 61
63 62 @web.authenticated
64 63 @json_errors
65 64 def post(self, kernel_id, action):
66 65 km = self.kernel_manager
67 66 if action == 'interrupt':
68 67 km.interrupt_kernel(kernel_id)
69 68 self.set_status(204)
70 69 if action == 'restart':
71 70 km.restart_kernel(kernel_id)
72 71 model = km.kernel_model(kernel_id)
73 72 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
74 self.write(jsonapi.dumps(model))
73 self.write(json.dumps(model))
75 74 self.finish()
76 75
77 76
78 77 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
79 78
80 79 def create_stream(self):
81 80 km = self.kernel_manager
82 81 meth = getattr(km, 'connect_%s' % self.channel)
83 82 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
84 83 # Create a kernel_info channel to query the kernel protocol version.
85 84 # This channel will be closed after the kernel_info reply is received.
86 85 self.kernel_info_channel = None
87 86 self.kernel_info_channel = km.connect_shell(self.kernel_id)
88 87 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
89 88 self._request_kernel_info()
90 89
91 90 def _request_kernel_info(self):
92 91 """send a request for kernel_info"""
93 92 self.log.debug("requesting kernel info")
94 93 self.session.send(self.kernel_info_channel, "kernel_info_request")
95 94
96 95 def _handle_kernel_info_reply(self, msg):
97 96 """process the kernel_info_reply
98 97
99 98 enabling msg spec adaptation, if necessary
100 99 """
101 100 idents,msg = self.session.feed_identities(msg)
102 101 try:
103 102 msg = self.session.unserialize(msg)
104 103 except:
105 104 self.log.error("Bad kernel_info reply", exc_info=True)
106 105 self._request_kernel_info()
107 106 return
108 107 else:
109 108 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in msg['content']:
110 109 self.log.error("Kernel info request failed, assuming current %s", msg['content'])
111 110 else:
112 111 protocol_version = msg['content']['protocol_version']
113 112 if protocol_version != kernel_protocol_version:
114 113 self.session.adapt_version = int(protocol_version.split('.')[0])
115 114 self.log.info("adapting kernel to %s" % protocol_version)
116 115 self.kernel_info_channel.close()
117 116 self.kernel_info_channel = None
118 117
119 118
120 119 def initialize(self, *args, **kwargs):
121 120 self.zmq_stream = None
122 121
123 122 def on_first_message(self, msg):
124 123 try:
125 124 super(ZMQChannelHandler, self).on_first_message(msg)
126 125 except web.HTTPError:
127 126 self.close()
128 127 return
129 128 try:
130 129 self.create_stream()
131 130 except web.HTTPError:
132 131 # WebSockets don't response to traditional error codes so we
133 132 # close the connection.
134 133 if not self.stream.closed():
135 134 self.stream.close()
136 135 self.close()
137 136 else:
138 137 self.zmq_stream.on_recv(self._on_zmq_reply)
139 138
140 139 def on_message(self, msg):
141 msg = jsonapi.loads(msg)
140 msg = json.loads(msg)
142 141 self.session.send(self.zmq_stream, msg)
143 142
144 143 def on_close(self):
145 144 # This method can be called twice, once by self.kernel_died and once
146 145 # from the WebSocket close event. If the WebSocket connection is
147 146 # closed before the ZMQ streams are setup, they could be None.
148 147 if self.zmq_stream is not None and not self.zmq_stream.closed():
149 148 self.zmq_stream.on_recv(None)
150 149 # close the socket directly, don't wait for the stream
151 150 socket = self.zmq_stream.socket
152 151 self.zmq_stream.close()
153 152 socket.close()
154 153
155 154
156 155 class IOPubHandler(ZMQChannelHandler):
157 156 channel = 'iopub'
158 157
159 158 def create_stream(self):
160 159 super(IOPubHandler, self).create_stream()
161 160 km = self.kernel_manager
162 161 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
163 162 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
164 163
165 164 def on_close(self):
166 165 km = self.kernel_manager
167 166 if self.kernel_id in km:
168 167 km.remove_restart_callback(
169 168 self.kernel_id, self.on_kernel_restarted,
170 169 )
171 170 km.remove_restart_callback(
172 171 self.kernel_id, self.on_restart_failed, 'dead',
173 172 )
174 173 super(IOPubHandler, self).on_close()
175 174
176 175 def _send_status_message(self, status):
177 176 msg = self.session.msg("status",
178 177 {'execution_state': status}
179 178 )
180 self.write_message(jsonapi.dumps(msg, default=date_default))
179 self.write_message(json.dumps(msg, default=date_default))
181 180
182 181 def on_kernel_restarted(self):
183 182 logging.warn("kernel %s restarted", self.kernel_id)
184 183 self._send_status_message('restarting')
185 184
186 185 def on_restart_failed(self):
187 186 logging.error("kernel %s restarted failed!", self.kernel_id)
188 187 self._send_status_message('dead')
189 188
190 189 def on_message(self, msg):
191 190 """IOPub messages make no sense"""
192 191 pass
193 192
194 193
195 194 class ShellHandler(ZMQChannelHandler):
196 195 channel = 'shell'
197 196
198 197
199 198 class StdinHandler(ZMQChannelHandler):
200 199 channel = 'stdin'
201 200
202 201
203 202 #-----------------------------------------------------------------------------
204 203 # URL to handler mappings
205 204 #-----------------------------------------------------------------------------
206 205
207 206
208 207 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
209 208 _kernel_action_regex = r"(?P<action>restart|interrupt)"
210 209
211 210 default_handlers = [
212 211 (r"/api/kernels", MainKernelHandler),
213 212 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
214 213 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
215 214 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
216 215 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
217 216 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
218 217 ]
General Comments 0
You need to be logged in to leave comments. Login now