##// END OF EJS Templates
Merge pull request #7389 from minrk/one-websocket...
Brian E. Granger -
r19850:62757cfc merge
parent child Browse files
Show More
@@ -0,0 +1,3 b''
1 * The notebook now uses a single websocket at `/kernels/<kernel-id>/channels` instead of separate
2 `/kernels/<kernel-id>/{shell|iopub|stdin}` channels. Messages on each channel are identified by a
3 `channel` key in the message dict, for both send and recv.
@@ -1,260 +1,263 b''
1 1 # coding: utf-8
2 2 """Tornado handlers for WebSocket <-> ZMQ sockets."""
3 3
4 4 # Copyright (c) IPython Development Team.
5 5 # Distributed under the terms of the Modified BSD License.
6 6
7 7 import os
8 8 import json
9 9 import struct
10 10 import warnings
11 11
12 12 try:
13 13 from urllib.parse import urlparse # Py 3
14 14 except ImportError:
15 15 from urlparse import urlparse # Py 2
16 16
17 17 import tornado
18 18 from tornado import gen, ioloop, web
19 19 from tornado.websocket import WebSocketHandler
20 20
21 21 from IPython.kernel.zmq.session import Session
22 22 from IPython.utils.jsonutil import date_default, extract_dates
23 23 from IPython.utils.py3compat import cast_unicode
24 24
25 25 from .handlers import IPythonHandler
26 26
27 27 def serialize_binary_message(msg):
28 28 """serialize a message as a binary blob
29 29
30 30 Header:
31 31
32 32 4 bytes: number of msg parts (nbufs) as 32b int
33 33 4 * nbufs bytes: offset for each buffer as integer as 32b int
34 34
35 35 Offsets are from the start of the buffer, including the header.
36 36
37 37 Returns
38 38 -------
39 39
40 40 The message serialized to bytes.
41 41
42 42 """
43 43 # don't modify msg or buffer list in-place
44 44 msg = msg.copy()
45 45 buffers = list(msg.pop('buffers'))
46 46 bmsg = json.dumps(msg, default=date_default).encode('utf8')
47 47 buffers.insert(0, bmsg)
48 48 nbufs = len(buffers)
49 49 offsets = [4 * (nbufs + 1)]
50 50 for buf in buffers[:-1]:
51 51 offsets.append(offsets[-1] + len(buf))
52 52 offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets)
53 53 buffers.insert(0, offsets_buf)
54 54 return b''.join(buffers)
55 55
56 56
57 57 def deserialize_binary_message(bmsg):
58 58 """deserialize a message from a binary blog
59 59
60 60 Header:
61 61
62 62 4 bytes: number of msg parts (nbufs) as 32b int
63 63 4 * nbufs bytes: offset for each buffer as integer as 32b int
64 64
65 65 Offsets are from the start of the buffer, including the header.
66 66
67 67 Returns
68 68 -------
69 69
70 70 message dictionary
71 71 """
72 72 nbufs = struct.unpack('!i', bmsg[:4])[0]
73 73 offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)]))
74 74 offsets.append(None)
75 75 bufs = []
76 76 for start, stop in zip(offsets[:-1], offsets[1:]):
77 77 bufs.append(bmsg[start:stop])
78 78 msg = json.loads(bufs[0].decode('utf8'))
79 79 msg['header'] = extract_dates(msg['header'])
80 80 msg['parent_header'] = extract_dates(msg['parent_header'])
81 81 msg['buffers'] = bufs[1:]
82 82 return msg
83 83
84 84 # ping interval for keeping websockets alive (30 seconds)
85 85 WS_PING_INTERVAL = 30000
86 86
87 87 if os.environ.get('IPYTHON_ALLOW_DRAFT_WEBSOCKETS_FOR_PHANTOMJS', False):
88 88 warnings.warn("""Allowing draft76 websocket connections!
89 89 This should only be done for testing with phantomjs!""")
90 90 from IPython.html import allow76
91 91 WebSocketHandler = allow76.AllowDraftWebSocketHandler
92 92 # draft 76 doesn't support ping
93 93 WS_PING_INTERVAL = 0
94 94
95 95 class ZMQStreamHandler(WebSocketHandler):
96 96
97 97 def check_origin(self, origin):
98 98 """Check Origin == Host or Access-Control-Allow-Origin.
99 99
100 100 Tornado >= 4 calls this method automatically, raising 403 if it returns False.
101 101 We call it explicitly in `open` on Tornado < 4.
102 102 """
103 103 if self.allow_origin == '*':
104 104 return True
105 105
106 106 host = self.request.headers.get("Host")
107 107
108 108 # If no header is provided, assume we can't verify origin
109 109 if origin is None:
110 110 self.log.warn("Missing Origin header, rejecting WebSocket connection.")
111 111 return False
112 112 if host is None:
113 113 self.log.warn("Missing Host header, rejecting WebSocket connection.")
114 114 return False
115 115
116 116 origin = origin.lower()
117 117 origin_host = urlparse(origin).netloc
118 118
119 119 # OK if origin matches host
120 120 if origin_host == host:
121 121 return True
122 122
123 123 # Check CORS headers
124 124 if self.allow_origin:
125 125 allow = self.allow_origin == origin
126 126 elif self.allow_origin_pat:
127 127 allow = bool(self.allow_origin_pat.match(origin))
128 128 else:
129 129 # No CORS headers deny the request
130 130 allow = False
131 131 if not allow:
132 132 self.log.warn("Blocking Cross Origin WebSocket Attempt. Origin: %s, Host: %s",
133 133 origin, host,
134 134 )
135 135 return allow
136 136
137 137 def clear_cookie(self, *args, **kwargs):
138 138 """meaningless for websockets"""
139 139 pass
140 140
141 def _reserialize_reply(self, msg_list):
141 def _reserialize_reply(self, msg_list, channel=None):
142 142 """Reserialize a reply message using JSON.
143 143
144 144 This takes the msg list from the ZMQ socket, deserializes it using
145 145 self.session and then serializes the result using JSON. This method
146 146 should be used by self._on_zmq_reply to build messages that can
147 147 be sent back to the browser.
148 148 """
149 149 idents, msg_list = self.session.feed_identities(msg_list)
150 150 msg = self.session.deserialize(msg_list)
151 if channel:
152 msg['channel'] = channel
151 153 if msg['buffers']:
152 154 buf = serialize_binary_message(msg)
153 155 return buf
154 156 else:
155 157 smsg = json.dumps(msg, default=date_default)
156 158 return cast_unicode(smsg)
157 159
158 def _on_zmq_reply(self, msg_list):
160 def _on_zmq_reply(self, stream, msg_list):
159 161 # Sometimes this gets triggered when the on_close method is scheduled in the
160 162 # eventloop but hasn't been called.
161 if self.stream.closed(): return
163 if stream.closed(): return
164 channel = getattr(stream, 'channel', None)
162 165 try:
163 msg = self._reserialize_reply(msg_list)
166 msg = self._reserialize_reply(msg_list, channel=channel)
164 167 except Exception:
165 168 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
166 169 else:
167 170 self.write_message(msg, binary=isinstance(msg, bytes))
168 171
169 172 class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
170 173 ping_callback = None
171 174 last_ping = 0
172 175 last_pong = 0
173 176
174 177 @property
175 178 def ping_interval(self):
176 179 """The interval for websocket keep-alive pings.
177 180
178 181 Set ws_ping_interval = 0 to disable pings.
179 182 """
180 183 return self.settings.get('ws_ping_interval', WS_PING_INTERVAL)
181 184
182 185 @property
183 186 def ping_timeout(self):
184 187 """If no ping is received in this many milliseconds,
185 188 close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
186 189 Default is max of 3 pings or 30 seconds.
187 190 """
188 191 return self.settings.get('ws_ping_timeout',
189 192 max(3 * self.ping_interval, WS_PING_INTERVAL)
190 193 )
191 194
192 195 def set_default_headers(self):
193 196 """Undo the set_default_headers in IPythonHandler
194 197
195 198 which doesn't make sense for websockets
196 199 """
197 200 pass
198 201
199 202 def pre_get(self):
200 203 """Run before finishing the GET request
201 204
202 205 Extend this method to add logic that should fire before
203 206 the websocket finishes completing.
204 207 """
205 208 # authenticate the request before opening the websocket
206 209 if self.get_current_user() is None:
207 210 self.log.warn("Couldn't authenticate WebSocket connection")
208 211 raise web.HTTPError(403)
209 212
210 213 if self.get_argument('session_id', False):
211 214 self.session.session = cast_unicode(self.get_argument('session_id'))
212 215 else:
213 216 self.log.warn("No session ID specified")
214 217
215 218 @gen.coroutine
216 219 def get(self, *args, **kwargs):
217 220 # pre_get can be a coroutine in subclasses
218 221 # assign and yield in two step to avoid tornado 3 issues
219 222 res = self.pre_get()
220 223 yield gen.maybe_future(res)
221 224 super(AuthenticatedZMQStreamHandler, self).get(*args, **kwargs)
222 225
223 226 def initialize(self):
224 227 self.log.debug("Initializing websocket connection %s", self.request.path)
225 228 self.session = Session(config=self.config)
226 229
227 230 def open(self, *args, **kwargs):
228 231 self.log.debug("Opening websocket %s", self.request.path)
229 232
230 233 # start the pinging
231 234 if self.ping_interval > 0:
232 235 loop = ioloop.IOLoop.current()
233 236 self.last_ping = loop.time() # Remember time of last ping
234 237 self.last_pong = self.last_ping
235 238 self.ping_callback = ioloop.PeriodicCallback(
236 239 self.send_ping, self.ping_interval, io_loop=loop,
237 240 )
238 241 self.ping_callback.start()
239 242
240 243 def send_ping(self):
241 244 """send a ping to keep the websocket alive"""
242 245 if self.stream.closed() and self.ping_callback is not None:
243 246 self.ping_callback.stop()
244 247 return
245 248
246 249 # check for timeout on pong. Make sure that we really have sent a recent ping in
247 250 # case the machine with both server and client has been suspended since the last ping.
248 251 now = ioloop.IOLoop.current().time()
249 252 since_last_pong = 1e3 * (now - self.last_pong)
250 253 since_last_ping = 1e3 * (now - self.last_ping)
251 254 if since_last_ping < 2*self.ping_interval and since_last_pong > self.ping_timeout:
252 255 self.log.warn("WebSocket ping timeout after %i ms.", since_last_pong)
253 256 self.close()
254 257 return
255 258
256 259 self.ping(b'')
257 260 self.last_ping = now
258 261
259 262 def on_pong(self, data):
260 263 self.last_pong = ioloop.IOLoop.current().time()
@@ -1,295 +1,279 b''
1 1 """Tornado handlers for kernels."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import json
7 7 import logging
8 8 from tornado import gen, web
9 9 from tornado.concurrent import Future
10 10 from tornado.ioloop import IOLoop
11 11
12 12 from IPython.utils.jsonutil import date_default
13 13 from IPython.utils.py3compat import cast_unicode
14 14 from IPython.html.utils import url_path_join, url_escape
15 15
16 16 from ...base.handlers import IPythonHandler, json_errors
17 17 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
18 18
19 19 from IPython.core.release import kernel_protocol_version
20 20
21 21 class MainKernelHandler(IPythonHandler):
22 22
23 23 @web.authenticated
24 24 @json_errors
25 25 def get(self):
26 26 km = self.kernel_manager
27 27 self.finish(json.dumps(km.list_kernels()))
28 28
29 29 @web.authenticated
30 30 @json_errors
31 31 def post(self):
32 32 km = self.kernel_manager
33 33 model = self.get_json_body()
34 34 if model is None:
35 35 model = {
36 36 'name': km.default_kernel_name
37 37 }
38 38 else:
39 39 model.setdefault('name', km.default_kernel_name)
40 40
41 41 kernel_id = km.start_kernel(kernel_name=model['name'])
42 42 model = km.kernel_model(kernel_id)
43 43 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
44 44 self.set_header('Location', url_escape(location))
45 45 self.set_status(201)
46 46 self.finish(json.dumps(model))
47 47
48 48
49 49 class KernelHandler(IPythonHandler):
50 50
51 51 SUPPORTED_METHODS = ('DELETE', 'GET')
52 52
53 53 @web.authenticated
54 54 @json_errors
55 55 def get(self, kernel_id):
56 56 km = self.kernel_manager
57 57 km._check_kernel_id(kernel_id)
58 58 model = km.kernel_model(kernel_id)
59 59 self.finish(json.dumps(model))
60 60
61 61 @web.authenticated
62 62 @json_errors
63 63 def delete(self, kernel_id):
64 64 km = self.kernel_manager
65 65 km.shutdown_kernel(kernel_id)
66 66 self.set_status(204)
67 67 self.finish()
68 68
69 69
70 70 class KernelActionHandler(IPythonHandler):
71 71
72 72 @web.authenticated
73 73 @json_errors
74 74 def post(self, kernel_id, action):
75 75 km = self.kernel_manager
76 76 if action == 'interrupt':
77 77 km.interrupt_kernel(kernel_id)
78 78 self.set_status(204)
79 79 if action == 'restart':
80 80 km.restart_kernel(kernel_id)
81 81 model = km.kernel_model(kernel_id)
82 82 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
83 83 self.write(json.dumps(model))
84 84 self.finish()
85 85
86 86
87 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
87 class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
88 88
89 89 @property
90 90 def kernel_info_timeout(self):
91 91 return self.settings.get('kernel_info_timeout', 10)
92 92
93 93 def __repr__(self):
94 94 return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
95 95
96 96 def create_stream(self):
97 97 km = self.kernel_manager
98 meth = getattr(km, 'connect_%s' % self.channel)
99 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
98 identity = self.session.bsession
99 for channel in ('shell', 'iopub', 'stdin'):
100 meth = getattr(km, 'connect_' + channel)
101 self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
102 stream.channel = channel
103 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
104 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
100 105
101 106 def request_kernel_info(self):
102 107 """send a request for kernel_info"""
103 108 km = self.kernel_manager
104 109 kernel = km.get_kernel(self.kernel_id)
105 110 try:
106 111 # check for previous request
107 112 future = kernel._kernel_info_future
108 113 except AttributeError:
109 114 self.log.debug("Requesting kernel info from %s", self.kernel_id)
110 115 # Create a kernel_info channel to query the kernel protocol version.
111 116 # This channel will be closed after the kernel_info reply is received.
112 117 if self.kernel_info_channel is None:
113 118 self.kernel_info_channel = km.connect_shell(self.kernel_id)
114 119 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
115 120 self.session.send(self.kernel_info_channel, "kernel_info_request")
116 121 # store the future on the kernel, so only one request is sent
117 122 kernel._kernel_info_future = self._kernel_info_future
118 123 else:
119 124 if not future.done():
120 125 self.log.debug("Waiting for pending kernel_info request")
121 126 future.add_done_callback(lambda f: self._finish_kernel_info(f.result()))
122 127 return self._kernel_info_future
123 128
124 129 def _handle_kernel_info_reply(self, msg):
125 130 """process the kernel_info_reply
126 131
127 132 enabling msg spec adaptation, if necessary
128 133 """
129 134 idents,msg = self.session.feed_identities(msg)
130 135 try:
131 136 msg = self.session.deserialize(msg)
132 137 except:
133 138 self.log.error("Bad kernel_info reply", exc_info=True)
134 139 self._kernel_info_future.set_result({})
135 140 return
136 141 else:
137 142 info = msg['content']
138 143 self.log.debug("Received kernel info: %s", info)
139 144 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
140 145 self.log.error("Kernel info request failed, assuming current %s", info)
141 146 info = {}
142 147 self._finish_kernel_info(info)
143 148
144 149 # close the kernel_info channel, we don't need it anymore
145 150 if self.kernel_info_channel:
146 151 self.kernel_info_channel.close()
147 152 self.kernel_info_channel = None
148 153
149 154 def _finish_kernel_info(self, info):
150 155 """Finish handling kernel_info reply
151 156
152 157 Set up protocol adaptation, if needed,
153 158 and signal that connection can continue.
154 159 """
155 160 protocol_version = info.get('protocol_version', kernel_protocol_version)
156 161 if protocol_version != kernel_protocol_version:
157 162 self.session.adapt_version = int(protocol_version.split('.')[0])
158 163 self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
159 164 if not self._kernel_info_future.done():
160 165 self._kernel_info_future.set_result(info)
161 166
162 167 def initialize(self):
163 super(ZMQChannelHandler, self).initialize()
168 super(ZMQChannelsHandler, self).initialize()
164 169 self.zmq_stream = None
170 self.channels = {}
165 171 self.kernel_id = None
166 172 self.kernel_info_channel = None
167 173 self._kernel_info_future = Future()
168 174
169 175 @gen.coroutine
170 176 def pre_get(self):
171 177 # authenticate first
172 super(ZMQChannelHandler, self).pre_get()
178 super(ZMQChannelsHandler, self).pre_get()
173 179 # then request kernel info, waiting up to a certain time before giving up.
174 180 # We don't want to wait forever, because browsers don't take it well when
175 181 # servers never respond to websocket connection requests.
176 182 future = self.request_kernel_info()
177 183
178 184 def give_up():
179 185 """Don't wait forever for the kernel to reply"""
180 186 if future.done():
181 187 return
182 188 self.log.warn("Timeout waiting for kernel_info reply from %s", self.kernel_id)
183 189 future.set_result({})
184 190 loop = IOLoop.current()
185 191 loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up)
186 192 # actually wait for it
187 193 yield future
188 194
189 195 @gen.coroutine
190 196 def get(self, kernel_id):
191 197 self.kernel_id = cast_unicode(kernel_id, 'ascii')
192 yield super(ZMQChannelHandler, self).get(kernel_id=kernel_id)
198 yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)
193 199
194 200 def open(self, kernel_id):
195 super(ZMQChannelHandler, self).open()
201 super(ZMQChannelsHandler, self).open()
196 202 try:
197 203 self.create_stream()
198 204 except web.HTTPError as e:
199 205 self.log.error("Error opening stream: %s", e)
200 206 # WebSockets don't response to traditional error codes so we
201 207 # close the connection.
202 if not self.stream.closed():
203 self.stream.close()
208 for channel, stream in self.channels.items():
209 if not stream.closed():
210 stream.close()
204 211 self.close()
205 212 else:
206 self.zmq_stream.on_recv(self._on_zmq_reply)
213 for channel, stream in self.channels.items():
214 stream.on_recv_stream(self._on_zmq_reply)
207 215
208 216 def on_message(self, msg):
209 if self.zmq_stream is None:
210 return
211 elif self.zmq_stream.closed():
212 self.log.info("%s closed, closing websocket.", self)
213 self.close()
214 return
215 217 if isinstance(msg, bytes):
216 218 msg = deserialize_binary_message(msg)
217 219 else:
218 220 msg = json.loads(msg)
219 self.session.send(self.zmq_stream, msg)
221 channel = msg.pop('channel', None)
222 if channel is None:
223 self.log.warn("No channel specified, assuming shell: %s", msg)
224 channel = 'shell'
225 stream = self.channels[channel]
226 self.session.send(stream, msg)
220 227
221 228 def on_close(self):
222 # This method can be called twice, once by self.kernel_died and once
223 # from the WebSocket close event. If the WebSocket connection is
224 # closed before the ZMQ streams are setup, they could be None.
225 if self.zmq_stream is not None and not self.zmq_stream.closed():
226 self.zmq_stream.on_recv(None)
227 # close the socket directly, don't wait for the stream
228 socket = self.zmq_stream.socket
229 self.zmq_stream.close()
230 socket.close()
231
232
233 class IOPubHandler(ZMQChannelHandler):
234 channel = 'iopub'
235
236 def create_stream(self):
237 super(IOPubHandler, self).create_stream()
238 km = self.kernel_manager
239 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
240 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
241
242 def on_close(self):
243 229 km = self.kernel_manager
244 230 if self.kernel_id in km:
245 231 km.remove_restart_callback(
246 232 self.kernel_id, self.on_kernel_restarted,
247 233 )
248 234 km.remove_restart_callback(
249 235 self.kernel_id, self.on_restart_failed, 'dead',
250 236 )
251 super(IOPubHandler, self).on_close()
252
237 # This method can be called twice, once by self.kernel_died and once
238 # from the WebSocket close event. If the WebSocket connection is
239 # closed before the ZMQ streams are setup, they could be None.
240 for channel, stream in self.channels.items():
241 if stream is not None and not stream.closed():
242 stream.on_recv(None)
243 # close the socket directly, don't wait for the stream
244 socket = stream.socket
245 stream.close()
246 socket.close()
247
248 self.channels = {}
249
253 250 def _send_status_message(self, status):
254 251 msg = self.session.msg("status",
255 252 {'execution_state': status}
256 253 )
254 msg['channel'] = 'iopub'
257 255 self.write_message(json.dumps(msg, default=date_default))
258 256
259 257 def on_kernel_restarted(self):
260 258 logging.warn("kernel %s restarted", self.kernel_id)
261 259 self._send_status_message('restarting')
262 260
263 261 def on_restart_failed(self):
264 262 logging.error("kernel %s restarted failed!", self.kernel_id)
265 263 self._send_status_message('dead')
266
267 def on_message(self, msg):
268 """IOPub messages make no sense"""
269 pass
270
271
272 class ShellHandler(ZMQChannelHandler):
273 channel = 'shell'
274
275
276 class StdinHandler(ZMQChannelHandler):
277 channel = 'stdin'
278 264
279 265
280 266 #-----------------------------------------------------------------------------
281 267 # URL to handler mappings
282 268 #-----------------------------------------------------------------------------
283 269
284 270
285 271 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
286 272 _kernel_action_regex = r"(?P<action>restart|interrupt)"
287 273
288 274 default_handlers = [
289 275 (r"/api/kernels", MainKernelHandler),
290 276 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
291 277 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
292 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
293 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
294 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
278 (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler),
295 279 ]
@@ -1,1074 +1,1052 b''
1 1 // Copyright (c) IPython Development Team.
2 2 // Distributed under the terms of the Modified BSD License.
3 3
4 4 define([
5 5 'base/js/namespace',
6 6 'jquery',
7 7 'base/js/utils',
8 8 './comm',
9 9 './serialize',
10 10 'widgets/js/init'
11 11 ], function(IPython, $, utils, comm, serialize, widgetmanager) {
12 12 "use strict";
13 13
14 14 /**
15 15 * A Kernel class to communicate with the Python kernel. This
16 16 * should generally not be constructed directly, but be created
17 17 * by. the `Session` object. Once created, this object should be
18 18 * used to communicate with the kernel.
19 19 *
20 20 * @class Kernel
21 21 * @param {string} kernel_service_url - the URL to access the kernel REST api
22 22 * @param {string} ws_url - the websockets URL
23 23 * @param {Notebook} notebook - notebook object
24 24 * @param {string} name - the kernel type (e.g. python3)
25 25 */
26 26 var Kernel = function (kernel_service_url, ws_url, notebook, name) {
27 27 this.events = notebook.events;
28 28
29 29 this.id = null;
30 30 this.name = name;
31
32 this.channels = {
33 'shell': null,
34 'iopub': null,
35 'stdin': null
36 };
31 this.ws = null;
37 32
38 33 this.kernel_service_url = kernel_service_url;
39 34 this.kernel_url = null;
40 35 this.ws_url = ws_url || IPython.utils.get_body_data("wsUrl");
41 36 if (!this.ws_url) {
42 37 // trailing 's' in https will become wss for secure web sockets
43 38 this.ws_url = location.protocol.replace('http', 'ws') + "//" + location.host;
44 39 }
45 40
46 41 this.username = "username";
47 42 this.session_id = utils.uuid();
48 43 this._msg_callbacks = {};
49 44 this.info_reply = {}; // kernel_info_reply stored here after starting
50 45
51 46 if (typeof(WebSocket) !== 'undefined') {
52 47 this.WebSocket = WebSocket;
53 48 } else if (typeof(MozWebSocket) !== 'undefined') {
54 49 this.WebSocket = MozWebSocket;
55 50 } else {
56 51 alert('Your browser does not have WebSocket support, please try Chrome, Safari or Firefox β‰₯ 6. Firefox 4 and 5 are also supported by you have to enable WebSockets in about:config.');
57 52 }
58 53
59 54 this.bind_events();
60 55 this.init_iopub_handlers();
61 56 this.comm_manager = new comm.CommManager(this);
62 57 this.widget_manager = new widgetmanager.WidgetManager(this.comm_manager, notebook);
63 58
64 59 this.last_msg_id = null;
65 60 this.last_msg_callbacks = {};
66 61
67 62 this._autorestart_attempt = 0;
68 63 this._reconnect_attempt = 0;
69 64 this.reconnect_limit = 7;
70 65 };
71 66
72 67 /**
73 68 * @function _get_msg
74 69 */
75 70 Kernel.prototype._get_msg = function (msg_type, content, metadata, buffers) {
76 71 var msg = {
77 72 header : {
78 73 msg_id : utils.uuid(),
79 74 username : this.username,
80 75 session : this.session_id,
81 76 msg_type : msg_type,
82 77 version : "5.0"
83 78 },
84 79 metadata : metadata || {},
85 80 content : content,
86 81 buffers : buffers || [],
87 82 parent_header : {}
88 83 };
89 84 return msg;
90 85 };
91 86
92 87 /**
93 88 * @function bind_events
94 89 */
95 90 Kernel.prototype.bind_events = function () {
96 91 var that = this;
97 92 this.events.on('send_input_reply.Kernel', function(evt, data) {
98 93 that.send_input_reply(data);
99 94 });
100 95
101 96 var record_status = function (evt, info) {
102 97 console.log('Kernel: ' + evt.type + ' (' + info.kernel.id + ')');
103 98 };
104 99
105 100 this.events.on('kernel_created.Kernel', record_status);
106 101 this.events.on('kernel_reconnecting.Kernel', record_status);
107 102 this.events.on('kernel_connected.Kernel', record_status);
108 103 this.events.on('kernel_starting.Kernel', record_status);
109 104 this.events.on('kernel_restarting.Kernel', record_status);
110 105 this.events.on('kernel_autorestarting.Kernel', record_status);
111 106 this.events.on('kernel_interrupting.Kernel', record_status);
112 107 this.events.on('kernel_disconnected.Kernel', record_status);
113 108 // these are commented out because they are triggered a lot, but can
114 109 // be uncommented for debugging purposes
115 110 //this.events.on('kernel_idle.Kernel', record_status);
116 111 //this.events.on('kernel_busy.Kernel', record_status);
117 112 this.events.on('kernel_ready.Kernel', record_status);
118 113 this.events.on('kernel_killed.Kernel', record_status);
119 114 this.events.on('kernel_dead.Kernel', record_status);
120 115
121 116 this.events.on('kernel_ready.Kernel', function () {
122 117 that._autorestart_attempt = 0;
123 118 });
124 119 this.events.on('kernel_connected.Kernel', function () {
125 120 that._reconnect_attempt = 0;
126 121 });
127 122 };
128 123
129 124 /**
130 125 * Initialize the iopub handlers.
131 126 *
132 127 * @function init_iopub_handlers
133 128 */
134 129 Kernel.prototype.init_iopub_handlers = function () {
135 130 var output_msg_types = ['stream', 'display_data', 'execute_result', 'error'];
136 131 this._iopub_handlers = {};
137 132 this.register_iopub_handler('status', $.proxy(this._handle_status_message, this));
138 133 this.register_iopub_handler('clear_output', $.proxy(this._handle_clear_output, this));
139 134 this.register_iopub_handler('execute_input', $.proxy(this._handle_input_message, this));
140 135
141 136 for (var i=0; i < output_msg_types.length; i++) {
142 137 this.register_iopub_handler(output_msg_types[i], $.proxy(this._handle_output_message, this));
143 138 }
144 139 };
145 140
146 141 /**
147 142 * GET /api/kernels
148 143 *
149 144 * Get the list of running kernels.
150 145 *
151 146 * @function list
152 147 * @param {function} [success] - function executed on ajax success
153 148 * @param {function} [error] - functon executed on ajax error
154 149 */
155 150 Kernel.prototype.list = function (success, error) {
156 151 $.ajax(this.kernel_service_url, {
157 152 processData: false,
158 153 cache: false,
159 154 type: "GET",
160 155 dataType: "json",
161 156 success: success,
162 157 error: this._on_error(error)
163 158 });
164 159 };
165 160
166 161 /**
167 162 * POST /api/kernels
168 163 *
169 164 * Start a new kernel.
170 165 *
171 166 * In general this shouldn't be used -- the kernel should be
172 167 * started through the session API. If you use this function and
173 168 * are also using the session API then your session and kernel
174 169 * WILL be out of sync!
175 170 *
176 171 * @function start
177 172 * @param {params} [Object] - parameters to include in the query string
178 173 * @param {function} [success] - function executed on ajax success
179 174 * @param {function} [error] - functon executed on ajax error
180 175 */
181 176 Kernel.prototype.start = function (params, success, error) {
182 177 var url = this.kernel_service_url;
183 178 var qs = $.param(params || {}); // query string for sage math stuff
184 179 if (qs !== "") {
185 180 url = url + "?" + qs;
186 181 }
187 182
188 183 var that = this;
189 184 var on_success = function (data, status, xhr) {
190 185 that.events.trigger('kernel_created.Kernel', {kernel: that});
191 186 that._kernel_created(data);
192 187 if (success) {
193 188 success(data, status, xhr);
194 189 }
195 190 };
196 191
197 192 $.ajax(url, {
198 193 processData: false,
199 194 cache: false,
200 195 type: "POST",
201 196 data: JSON.stringify({name: this.name}),
202 197 dataType: "json",
203 198 success: this._on_success(on_success),
204 199 error: this._on_error(error)
205 200 });
206 201
207 202 return url;
208 203 };
209 204
210 205 /**
211 206 * GET /api/kernels/[:kernel_id]
212 207 *
213 208 * Get information about the kernel.
214 209 *
215 210 * @function get_info
216 211 * @param {function} [success] - function executed on ajax success
217 212 * @param {function} [error] - functon executed on ajax error
218 213 */
219 214 Kernel.prototype.get_info = function (success, error) {
220 215 $.ajax(this.kernel_url, {
221 216 processData: false,
222 217 cache: false,
223 218 type: "GET",
224 219 dataType: "json",
225 220 success: this._on_success(success),
226 221 error: this._on_error(error)
227 222 });
228 223 };
229 224
230 225 /**
231 226 * DELETE /api/kernels/[:kernel_id]
232 227 *
233 228 * Shutdown the kernel.
234 229 *
235 230 * If you are also using sessions, then this function shoul NOT be
236 231 * used. Instead, use Session.delete. Otherwise, the session and
237 232 * kernel WILL be out of sync.
238 233 *
239 234 * @function kill
240 235 * @param {function} [success] - function executed on ajax success
241 236 * @param {function} [error] - functon executed on ajax error
242 237 */
243 238 Kernel.prototype.kill = function (success, error) {
244 239 this.events.trigger('kernel_killed.Kernel', {kernel: this});
245 240 this._kernel_dead();
246 241 $.ajax(this.kernel_url, {
247 242 processData: false,
248 243 cache: false,
249 244 type: "DELETE",
250 245 dataType: "json",
251 246 success: this._on_success(success),
252 247 error: this._on_error(error)
253 248 });
254 249 };
255 250
256 251 /**
257 252 * POST /api/kernels/[:kernel_id]/interrupt
258 253 *
259 254 * Interrupt the kernel.
260 255 *
261 256 * @function interrupt
262 257 * @param {function} [success] - function executed on ajax success
263 258 * @param {function} [error] - functon executed on ajax error
264 259 */
265 260 Kernel.prototype.interrupt = function (success, error) {
266 261 this.events.trigger('kernel_interrupting.Kernel', {kernel: this});
267 262
268 263 var that = this;
269 264 var on_success = function (data, status, xhr) {
270 265 /**
271 266 * get kernel info so we know what state the kernel is in
272 267 */
273 268 that.kernel_info();
274 269 if (success) {
275 270 success(data, status, xhr);
276 271 }
277 272 };
278 273
279 274 var url = utils.url_join_encode(this.kernel_url, 'interrupt');
280 275 $.ajax(url, {
281 276 processData: false,
282 277 cache: false,
283 278 type: "POST",
284 279 dataType: "json",
285 280 success: this._on_success(on_success),
286 281 error: this._on_error(error)
287 282 });
288 283 };
289 284
290 285 Kernel.prototype.restart = function (success, error) {
291 286 /**
292 287 * POST /api/kernels/[:kernel_id]/restart
293 288 *
294 289 * Restart the kernel.
295 290 *
296 291 * @function interrupt
297 292 * @param {function} [success] - function executed on ajax success
298 293 * @param {function} [error] - functon executed on ajax error
299 294 */
300 295 this.events.trigger('kernel_restarting.Kernel', {kernel: this});
301 296 this.stop_channels();
302 297
303 298 var that = this;
304 299 var on_success = function (data, status, xhr) {
305 300 that.events.trigger('kernel_created.Kernel', {kernel: that});
306 301 that._kernel_created(data);
307 302 if (success) {
308 303 success(data, status, xhr);
309 304 }
310 305 };
311 306
312 307 var on_error = function (xhr, status, err) {
313 308 that.events.trigger('kernel_dead.Kernel', {kernel: that});
314 309 that._kernel_dead();
315 310 if (error) {
316 311 error(xhr, status, err);
317 312 }
318 313 };
319 314
320 315 var url = utils.url_join_encode(this.kernel_url, 'restart');
321 316 $.ajax(url, {
322 317 processData: false,
323 318 cache: false,
324 319 type: "POST",
325 320 dataType: "json",
326 321 success: this._on_success(on_success),
327 322 error: this._on_error(on_error)
328 323 });
329 324 };
330 325
331 326 Kernel.prototype.reconnect = function () {
332 327 /**
333 328 * Reconnect to a disconnected kernel. This is not actually a
334 329 * standard HTTP request, but useful function nonetheless for
335 330 * reconnecting to the kernel if the connection is somehow lost.
336 331 *
337 332 * @function reconnect
338 333 */
339 334 if (this.is_connected()) {
340 335 return;
341 336 }
342 337 this._reconnect_attempt = this._reconnect_attempt + 1;
343 338 this.events.trigger('kernel_reconnecting.Kernel', {
344 339 kernel: this,
345 340 attempt: this._reconnect_attempt,
346 341 });
347 342 this.start_channels();
348 343 };
349 344
350 345 Kernel.prototype._on_success = function (success) {
351 346 /**
352 347 * Handle a successful AJAX request by updating the kernel id and
353 348 * name from the response, and then optionally calling a provided
354 349 * callback.
355 350 *
356 351 * @function _on_success
357 352 * @param {function} success - callback
358 353 */
359 354 var that = this;
360 355 return function (data, status, xhr) {
361 356 if (data) {
362 357 that.id = data.id;
363 358 that.name = data.name;
364 359 }
365 360 that.kernel_url = utils.url_join_encode(that.kernel_service_url, that.id);
366 361 if (success) {
367 362 success(data, status, xhr);
368 363 }
369 364 };
370 365 };
371 366
372 367 Kernel.prototype._on_error = function (error) {
373 368 /**
374 369 * Handle a failed AJAX request by logging the error message, and
375 370 * then optionally calling a provided callback.
376 371 *
377 372 * @function _on_error
378 373 * @param {function} error - callback
379 374 */
380 375 return function (xhr, status, err) {
381 376 utils.log_ajax_error(xhr, status, err);
382 377 if (error) {
383 378 error(xhr, status, err);
384 379 }
385 380 };
386 381 };
387 382
388 383 Kernel.prototype._kernel_created = function (data) {
389 384 /**
390 385 * Perform necessary tasks once the kernel has been started,
391 386 * including actually connecting to the kernel.
392 387 *
393 388 * @function _kernel_created
394 389 * @param {Object} data - information about the kernel including id
395 390 */
396 391 this.id = data.id;
397 392 this.kernel_url = utils.url_join_encode(this.kernel_service_url, this.id);
398 393 this.start_channels();
399 394 };
400 395
401 396 Kernel.prototype._kernel_connected = function () {
402 397 /**
403 398 * Perform necessary tasks once the connection to the kernel has
404 399 * been established. This includes requesting information about
405 400 * the kernel.
406 401 *
407 402 * @function _kernel_connected
408 403 */
409 404 this.events.trigger('kernel_connected.Kernel', {kernel: this});
410 405 this.events.trigger('kernel_starting.Kernel', {kernel: this});
411 406 // get kernel info so we know what state the kernel is in
412 407 var that = this;
413 408 this.kernel_info(function (reply) {
414 409 that.info_reply = reply.content;
415 410 that.events.trigger('kernel_ready.Kernel', {kernel: that});
416 411 });
417 412 };
418 413
419 414 Kernel.prototype._kernel_dead = function () {
420 415 /**
421 416 * Perform necessary tasks after the kernel has died. This closing
422 417 * communication channels to the kernel if they are still somehow
423 418 * open.
424 419 *
425 420 * @function _kernel_dead
426 421 */
427 422 this.stop_channels();
428 423 };
429 424
430 425 Kernel.prototype.start_channels = function () {
431 426 /**
432 * Start the `shell`and `iopub` channels.
427 * Start the websocket channels.
433 428 * Will stop and restart them if they already exist.
434 429 *
435 430 * @function start_channels
436 431 */
437 432 var that = this;
438 433 this.stop_channels();
439 434 var ws_host_url = this.ws_url + this.kernel_url;
440 435
441 436 console.log("Starting WebSockets:", ws_host_url);
442 437
443 var channel_url = function(channel) {
444 return [
438 this.ws = new this.WebSocket([
445 439 that.ws_url,
446 utils.url_join_encode(that.kernel_url, channel),
440 utils.url_join_encode(that.kernel_url, 'channels'),
447 441 "?session_id=" + that.session_id
448 ].join('');
449 };
450 this.channels.shell = new this.WebSocket(channel_url("shell"));
451 this.channels.stdin = new this.WebSocket(channel_url("stdin"));
452 this.channels.iopub = new this.WebSocket(channel_url("iopub"));
442 ].join('')
443 );
453 444
454 445 var already_called_onclose = false; // only alert once
455 446 var ws_closed_early = function(evt){
456 447 if (already_called_onclose){
457 448 return;
458 449 }
459 450 already_called_onclose = true;
460 451 if ( ! evt.wasClean ){
461 452 // If the websocket was closed early, that could mean
462 453 // that the kernel is actually dead. Try getting
463 454 // information about the kernel from the API call --
464 455 // if that fails, then assume the kernel is dead,
465 456 // otherwise just follow the typical websocket closed
466 457 // protocol.
467 458 that.get_info(function () {
468 459 that._ws_closed(ws_host_url, false);
469 460 }, function () {
470 461 that.events.trigger('kernel_dead.Kernel', {kernel: that});
471 462 that._kernel_dead();
472 463 });
473 464 }
474 465 };
475 466 var ws_closed_late = function(evt){
476 467 if (already_called_onclose){
477 468 return;
478 469 }
479 470 already_called_onclose = true;
480 471 if ( ! evt.wasClean ){
481 472 that._ws_closed(ws_host_url, false);
482 473 }
483 474 };
484 475 var ws_error = function(evt){
485 476 if (already_called_onclose){
486 477 return;
487 478 }
488 479 already_called_onclose = true;
489 480 that._ws_closed(ws_host_url, true);
490 481 };
491 482
492 for (var c in this.channels) {
493 this.channels[c].onopen = $.proxy(this._ws_opened, this);
494 this.channels[c].onclose = ws_closed_early;
495 this.channels[c].onerror = ws_error;
496 }
483 this.ws.onopen = $.proxy(this._ws_opened, this);
484 this.ws.onclose = ws_closed_early;
485 this.ws.onerror = ws_error;
497 486 // switch from early-close to late-close message after 1s
498 487 setTimeout(function() {
499 for (var c in that.channels) {
500 if (that.channels[c] !== null) {
501 that.channels[c].onclose = ws_closed_late;
502 }
488 if (that.ws !== null) {
489 that.ws.onclose = ws_closed_late;
503 490 }
504 491 }, 1000);
505 this.channels.shell.onmessage = $.proxy(this._handle_shell_reply, this);
506 this.channels.iopub.onmessage = $.proxy(this._handle_iopub_message, this);
507 this.channels.stdin.onmessage = $.proxy(this._handle_input_request, this);
492 this.ws.onmessage = $.proxy(this._handle_ws_message, this);
508 493 };
509 494
510 495 Kernel.prototype._ws_opened = function (evt) {
511 496 /**
512 497 * Handle a websocket entering the open state,
513 * signaling that the kernel is connected when all channels are open.
498 * signaling that the kernel is connected when websocket is open.
514 499 *
515 500 * @function _ws_opened
516 501 */
517 502 if (this.is_connected()) {
518 503 // all events ready, trigger started event.
519 504 this._kernel_connected();
520 505 }
521 506 };
522 507
523 508 Kernel.prototype._ws_closed = function(ws_url, error) {
524 509 /**
525 * Handle a websocket entering the closed state. This closes the
526 * other communication channels if they are open. If the websocket
510 * Handle a websocket entering the closed state. If the websocket
527 511 * was not closed due to an error, try to reconnect to the kernel.
528 512 *
529 513 * @function _ws_closed
530 514 * @param {string} ws_url - the websocket url
531 515 * @param {bool} error - whether the connection was closed due to an error
532 516 */
533 517 this.stop_channels();
534 518
535 519 this.events.trigger('kernel_disconnected.Kernel', {kernel: this});
536 520 if (error) {
537 521 console.log('WebSocket connection failed: ', ws_url);
538 522 this.events.trigger('kernel_connection_failed.Kernel', {kernel: this, ws_url: ws_url, attempt: this._reconnect_attempt});
539 523 }
540 524 this._schedule_reconnect();
541 525 };
542 526
543 527 Kernel.prototype._schedule_reconnect = function () {
544 528 /**
545 529 * function to call when kernel connection is lost
546 530 * schedules reconnect, or fires 'connection_dead' if reconnect limit is hit
547 531 */
548 532 if (this._reconnect_attempt < this.reconnect_limit) {
549 533 var timeout = Math.pow(2, this._reconnect_attempt);
550 534 console.log("Connection lost, reconnecting in " + timeout + " seconds.");
551 535 setTimeout($.proxy(this.reconnect, this), 1e3 * timeout);
552 536 } else {
553 537 this.events.trigger('kernel_connection_dead.Kernel', {
554 538 kernel: this,
555 539 reconnect_attempt: this._reconnect_attempt,
556 540 });
557 541 console.log("Failed to reconnect, giving up.");
558 542 }
559 543 };
560 544
561 545 Kernel.prototype.stop_channels = function () {
562 546 /**
563 * Close the websocket channels. After successful close, the value
564 * in `this.channels[channel_name]` will be null.
547 * Close the websocket. After successful close, the value
548 * in `this.ws` will be null.
565 549 *
566 550 * @function stop_channels
567 551 */
568 552 var that = this;
569 var close = function (c) {
570 return function () {
571 if (that.channels[c] && that.channels[c].readyState === WebSocket.CLOSED) {
572 that.channels[c] = null;
573 }
574 };
553 var close = function () {
554 if (that.ws && that.ws.readyState === WebSocket.CLOSED) {
555 that.ws = null;
556 }
575 557 };
576 for (var c in this.channels) {
577 if ( this.channels[c] !== null ) {
578 if (this.channels[c].readyState === WebSocket.OPEN) {
579 this.channels[c].onclose = close(c);
580 this.channels[c].close();
581 } else {
582 close(c)();
583 }
558 if (this.ws !== null) {
559 if (this.ws.readyState === WebSocket.OPEN) {
560 this.ws.onclose = close;
561 this.ws.close();
562 } else {
563 close();
584 564 }
585 565 }
586 566 };
587 567
588 568 Kernel.prototype.is_connected = function () {
589 569 /**
590 570 * Check whether there is a connection to the kernel. This
591 * function only returns true if all channel objects have been
592 * created and have a state of WebSocket.OPEN.
571 * function only returns true if websocket has been
572 * created and has a state of WebSocket.OPEN.
593 573 *
594 574 * @function is_connected
595 575 * @returns {bool} - whether there is a connection
596 576 */
597 for (var c in this.channels) {
598 // if any channel is not ready, then we're not connected
599 if (this.channels[c] === null) {
600 return false;
601 }
602 if (this.channels[c].readyState !== WebSocket.OPEN) {
603 return false;
604 }
577 // if any channel is not ready, then we're not connected
578 if (this.ws === null) {
579 return false;
580 }
581 if (this.ws.readyState !== WebSocket.OPEN) {
582 return false;
605 583 }
606 584 return true;
607 585 };
608 586
609 587 Kernel.prototype.is_fully_disconnected = function () {
610 588 /**
611 589 * Check whether the connection to the kernel has been completely
612 590 * severed. This function only returns true if all channel objects
613 591 * are null.
614 592 *
615 593 * @function is_fully_disconnected
616 594 * @returns {bool} - whether the kernel is fully disconnected
617 595 */
618 for (var c in this.channels) {
619 if (this.channels[c] === null) {
620 return true;
621 }
622 }
623 return false;
596 return (this.ws === null);
624 597 };
625 598
626 599 Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) {
627 600 /**
628 601 * Send a message on the Kernel's shell channel
629 602 *
630 603 * @function send_shell_message
631 604 */
632 605 if (!this.is_connected()) {
633 606 throw new Error("kernel is not connected");
634 607 }
635 608 var msg = this._get_msg(msg_type, content, metadata, buffers);
636 this.channels.shell.send(serialize.serialize(msg));
609 msg.channel = 'shell';
610 this.ws.send(serialize.serialize(msg));
637 611 this.set_callbacks_for_msg(msg.header.msg_id, callbacks);
638 612 return msg.header.msg_id;
639 613 };
640 614
641 615 Kernel.prototype.kernel_info = function (callback) {
642 616 /**
643 617 * Get kernel info
644 618 *
645 619 * @function kernel_info
646 620 * @param callback {function}
647 621 *
648 622 * When calling this method, pass a callback function that expects one argument.
649 623 * The callback will be passed the complete `kernel_info_reply` message documented
650 624 * [here](http://ipython.org/ipython-doc/dev/development/messaging.html#kernel-info)
651 625 */
652 626 var callbacks;
653 627 if (callback) {
654 628 callbacks = { shell : { reply : callback } };
655 629 }
656 630 return this.send_shell_message("kernel_info_request", {}, callbacks);
657 631 };
658 632
659 633 Kernel.prototype.inspect = function (code, cursor_pos, callback) {
660 634 /**
661 635 * Get info on an object
662 636 *
663 637 * When calling this method, pass a callback function that expects one argument.
664 638 * The callback will be passed the complete `inspect_reply` message documented
665 639 * [here](http://ipython.org/ipython-doc/dev/development/messaging.html#object-information)
666 640 *
667 641 * @function inspect
668 642 * @param code {string}
669 643 * @param cursor_pos {integer}
670 644 * @param callback {function}
671 645 */
672 646 var callbacks;
673 647 if (callback) {
674 648 callbacks = { shell : { reply : callback } };
675 649 }
676 650
677 651 var content = {
678 652 code : code,
679 653 cursor_pos : cursor_pos,
680 654 detail_level : 0
681 655 };
682 656 return this.send_shell_message("inspect_request", content, callbacks);
683 657 };
684 658
685 659 Kernel.prototype.execute = function (code, callbacks, options) {
686 660 /**
687 661 * Execute given code into kernel, and pass result to callback.
688 662 *
689 663 * @async
690 664 * @function execute
691 665 * @param {string} code
692 666 * @param [callbacks] {Object} With the following keys (all optional)
693 667 * @param callbacks.shell.reply {function}
694 668 * @param callbacks.shell.payload.[payload_name] {function}
695 669 * @param callbacks.iopub.output {function}
696 670 * @param callbacks.iopub.clear_output {function}
697 671 * @param callbacks.input {function}
698 672 * @param {object} [options]
699 673 * @param [options.silent=false] {Boolean}
700 674 * @param [options.user_expressions=empty_dict] {Dict}
701 675 * @param [options.allow_stdin=false] {Boolean} true|false
702 676 *
703 677 * @example
704 678 *
705 679 * The options object should contain the options for the execute
706 680 * call. Its default values are:
707 681 *
708 682 * options = {
709 683 * silent : true,
710 684 * user_expressions : {},
711 685 * allow_stdin : false
712 686 * }
713 687 *
714 688 * When calling this method pass a callbacks structure of the
715 689 * form:
716 690 *
717 691 * callbacks = {
718 692 * shell : {
719 693 * reply : execute_reply_callback,
720 694 * payload : {
721 695 * set_next_input : set_next_input_callback,
722 696 * }
723 697 * },
724 698 * iopub : {
725 699 * output : output_callback,
726 700 * clear_output : clear_output_callback,
727 701 * },
728 702 * input : raw_input_callback
729 703 * }
730 704 *
731 705 * Each callback will be passed the entire message as a single
732 706 * arugment. Payload handlers will be passed the corresponding
733 707 * payload and the execute_reply message.
734 708 */
735 709 var content = {
736 710 code : code,
737 711 silent : true,
738 712 store_history : false,
739 713 user_expressions : {},
740 714 allow_stdin : false
741 715 };
742 716 callbacks = callbacks || {};
743 717 if (callbacks.input !== undefined) {
744 718 content.allow_stdin = true;
745 719 }
746 720 $.extend(true, content, options);
747 721 this.events.trigger('execution_request.Kernel', {kernel: this, content: content});
748 722 return this.send_shell_message("execute_request", content, callbacks);
749 723 };
750 724
751 725 /**
752 726 * When calling this method, pass a function to be called with the
753 727 * `complete_reply` message as its only argument when it arrives.
754 728 *
755 729 * `complete_reply` is documented
756 730 * [here](http://ipython.org/ipython-doc/dev/development/messaging.html#complete)
757 731 *
758 732 * @function complete
759 733 * @param code {string}
760 734 * @param cursor_pos {integer}
761 735 * @param callback {function}
762 736 */
763 737 Kernel.prototype.complete = function (code, cursor_pos, callback) {
764 738 var callbacks;
765 739 if (callback) {
766 740 callbacks = { shell : { reply : callback } };
767 741 }
768 742 var content = {
769 743 code : code,
770 744 cursor_pos : cursor_pos
771 745 };
772 746 return this.send_shell_message("complete_request", content, callbacks);
773 747 };
774 748
775 749 /**
776 750 * @function send_input_reply
777 751 */
778 752 Kernel.prototype.send_input_reply = function (input) {
779 753 if (!this.is_connected()) {
780 754 throw new Error("kernel is not connected");
781 755 }
782 756 var content = {
783 757 value : input
784 758 };
785 759 this.events.trigger('input_reply.Kernel', {kernel: this, content: content});
786 760 var msg = this._get_msg("input_reply", content);
787 this.channels.stdin.send(serialize.serialize(msg));
761 msg.channel = 'stdin';
762 this.ws.send(serialize.serialize(msg));
788 763 return msg.header.msg_id;
789 764 };
790 765
791 766 /**
792 767 * @function register_iopub_handler
793 768 */
794 769 Kernel.prototype.register_iopub_handler = function (msg_type, callback) {
795 770 this._iopub_handlers[msg_type] = callback;
796 771 };
797 772
798 773 /**
799 774 * Get the iopub handler for a specific message type.
800 775 *
801 776 * @function get_iopub_handler
802 777 */
803 778 Kernel.prototype.get_iopub_handler = function (msg_type) {
804 779 return this._iopub_handlers[msg_type];
805 780 };
806 781
807 782 /**
808 783 * Get callbacks for a specific message.
809 784 *
810 785 * @function get_callbacks_for_msg
811 786 */
812 787 Kernel.prototype.get_callbacks_for_msg = function (msg_id) {
813 788 if (msg_id == this.last_msg_id) {
814 789 return this.last_msg_callbacks;
815 790 } else {
816 791 return this._msg_callbacks[msg_id];
817 792 }
818 793 };
819 794
820 795 /**
821 796 * Clear callbacks for a specific message.
822 797 *
823 798 * @function clear_callbacks_for_msg
824 799 */
825 800 Kernel.prototype.clear_callbacks_for_msg = function (msg_id) {
826 801 if (this._msg_callbacks[msg_id] !== undefined ) {
827 802 delete this._msg_callbacks[msg_id];
828 803 }
829 804 };
830 805
831 806 /**
832 807 * @function _finish_shell
833 808 */
834 809 Kernel.prototype._finish_shell = function (msg_id) {
835 810 var callbacks = this._msg_callbacks[msg_id];
836 811 if (callbacks !== undefined) {
837 812 callbacks.shell_done = true;
838 813 if (callbacks.iopub_done) {
839 814 this.clear_callbacks_for_msg(msg_id);
840 815 }
841 816 }
842 817 };
843 818
844 819 /**
845 820 * @function _finish_iopub
846 821 */
847 822 Kernel.prototype._finish_iopub = function (msg_id) {
848 823 var callbacks = this._msg_callbacks[msg_id];
849 824 if (callbacks !== undefined) {
850 825 callbacks.iopub_done = true;
851 826 if (callbacks.shell_done) {
852 827 this.clear_callbacks_for_msg(msg_id);
853 828 }
854 829 }
855 830 };
856 831
857 832 /**
858 833 * Set callbacks for a particular message.
859 834 * Callbacks should be a struct of the following form:
860 835 * shell : {
861 836 *
862 837 * }
863 838 *
864 839 * @function set_callbacks_for_msg
865 840 */
866 841 Kernel.prototype.set_callbacks_for_msg = function (msg_id, callbacks) {
867 842 this.last_msg_id = msg_id;
868 843 if (callbacks) {
869 844 // shallow-copy mapping, because we will modify it at the top level
870 845 var cbcopy = this._msg_callbacks[msg_id] = this.last_msg_callbacks = {};
871 846 cbcopy.shell = callbacks.shell;
872 847 cbcopy.iopub = callbacks.iopub;
873 848 cbcopy.input = callbacks.input;
874 849 cbcopy.shell_done = (!callbacks.shell);
875 850 cbcopy.iopub_done = (!callbacks.iopub);
876 851 } else {
877 852 this.last_msg_callbacks = {};
878 853 }
879 854 };
880
881 /**
882 * @function _handle_shell_reply
883 */
884 Kernel.prototype._handle_shell_reply = function (e) {
885 serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this));
855
856 Kernel.prototype._handle_ws_message = function (e) {
857 serialize.deserialize(e.data, $.proxy(this._finish_ws_message, this));
886 858 };
887 859
888 Kernel.prototype._finish_shell_reply = function (reply) {
860 Kernel.prototype._finish_ws_message = function (msg) {
861 switch (msg.channel) {
862 case 'shell':
863 this._handle_shell_reply(msg);
864 break;
865 case 'iopub':
866 this._handle_iopub_message(msg);
867 break;
868 case 'stdin':
869 this._handle_input_request(msg);
870 break;
871 default:
872 console.error("unrecognized message channel", msg.channel, msg);
873 }
874 };
875
876 Kernel.prototype._handle_shell_reply = function (reply) {
889 877 this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply});
890 878 var content = reply.content;
891 879 var metadata = reply.metadata;
892 880 var parent_id = reply.parent_header.msg_id;
893 881 var callbacks = this.get_callbacks_for_msg(parent_id);
894 882 if (!callbacks || !callbacks.shell) {
895 883 return;
896 884 }
897 885 var shell_callbacks = callbacks.shell;
898 886
899 887 // signal that shell callbacks are done
900 888 this._finish_shell(parent_id);
901 889
902 890 if (shell_callbacks.reply !== undefined) {
903 891 shell_callbacks.reply(reply);
904 892 }
905 893 if (content.payload && shell_callbacks.payload) {
906 894 this._handle_payloads(content.payload, shell_callbacks.payload, reply);
907 895 }
908 896 };
909 897
910 898 /**
911 899 * @function _handle_payloads
912 900 */
913 901 Kernel.prototype._handle_payloads = function (payloads, payload_callbacks, msg) {
914 902 var l = payloads.length;
915 903 // Payloads are handled by triggering events because we don't want the Kernel
916 904 // to depend on the Notebook or Pager classes.
917 905 for (var i=0; i<l; i++) {
918 906 var payload = payloads[i];
919 907 var callback = payload_callbacks[payload.source];
920 908 if (callback) {
921 909 callback(payload, msg);
922 910 }
923 911 }
924 912 };
925 913
926 914 /**
927 915 * @function _handle_status_message
928 916 */
929 917 Kernel.prototype._handle_status_message = function (msg) {
930 918 var execution_state = msg.content.execution_state;
931 919 var parent_id = msg.parent_header.msg_id;
932 920
933 921 // dispatch status msg callbacks, if any
934 922 var callbacks = this.get_callbacks_for_msg(parent_id);
935 923 if (callbacks && callbacks.iopub && callbacks.iopub.status) {
936 924 try {
937 925 callbacks.iopub.status(msg);
938 926 } catch (e) {
939 927 console.log("Exception in status msg handler", e, e.stack);
940 928 }
941 929 }
942 930
943 931 if (execution_state === 'busy') {
944 932 this.events.trigger('kernel_busy.Kernel', {kernel: this});
945 933
946 934 } else if (execution_state === 'idle') {
947 935 // signal that iopub callbacks are (probably) done
948 936 // async output may still arrive,
949 937 // but only for the most recent request
950 938 this._finish_iopub(parent_id);
951 939
952 940 // trigger status_idle event
953 941 this.events.trigger('kernel_idle.Kernel', {kernel: this});
954 942
955 943 } else if (execution_state === 'starting') {
956 944 this.events.trigger('kernel_starting.Kernel', {kernel: this});
957 945 var that = this;
958 946 this.kernel_info(function (reply) {
959 947 that.info_reply = reply.content;
960 948 that.events.trigger('kernel_ready.Kernel', {kernel: that});
961 949 });
962 950
963 951 } else if (execution_state === 'restarting') {
964 952 // autorestarting is distinct from restarting,
965 953 // in that it means the kernel died and the server is restarting it.
966 954 // kernel_restarting sets the notification widget,
967 955 // autorestart shows the more prominent dialog.
968 956 this._autorestart_attempt = this._autorestart_attempt + 1;
969 957 this.events.trigger('kernel_restarting.Kernel', {kernel: this});
970 958 this.events.trigger('kernel_autorestarting.Kernel', {kernel: this, attempt: this._autorestart_attempt});
971 959
972 960 } else if (execution_state === 'dead') {
973 961 this.events.trigger('kernel_dead.Kernel', {kernel: this});
974 962 this._kernel_dead();
975 963 }
976 964 };
977 965
978 966 /**
979 967 * Handle clear_output message
980 968 *
981 969 * @function _handle_clear_output
982 970 */
983 971 Kernel.prototype._handle_clear_output = function (msg) {
984 972 var callbacks = this.get_callbacks_for_msg(msg.parent_header.msg_id);
985 973 if (!callbacks || !callbacks.iopub) {
986 974 return;
987 975 }
988 976 var callback = callbacks.iopub.clear_output;
989 977 if (callback) {
990 978 callback(msg);
991 979 }
992 980 };
993 981
994 982 /**
995 983 * handle an output message (execute_result, display_data, etc.)
996 984 *
997 985 * @function _handle_output_message
998 986 */
999 987 Kernel.prototype._handle_output_message = function (msg) {
1000 988 var callbacks = this.get_callbacks_for_msg(msg.parent_header.msg_id);
1001 989 if (!callbacks || !callbacks.iopub) {
1002 990 // The message came from another client. Let the UI decide what to
1003 991 // do with it.
1004 992 this.events.trigger('received_unsolicited_message.Kernel', msg);
1005 993 return;
1006 994 }
1007 995 var callback = callbacks.iopub.output;
1008 996 if (callback) {
1009 997 callback(msg);
1010 998 }
1011 999 };
1012 1000
1013 1001 /**
1014 1002 * Handle an input message (execute_input).
1015 1003 *
1016 1004 * @function _handle_input message
1017 1005 */
1018 1006 Kernel.prototype._handle_input_message = function (msg) {
1019 1007 var callbacks = this.get_callbacks_for_msg(msg.parent_header.msg_id);
1020 1008 if (!callbacks) {
1021 1009 // The message came from another client. Let the UI decide what to
1022 1010 // do with it.
1023 1011 this.events.trigger('received_unsolicited_message.Kernel', msg);
1024 1012 }
1025 1013 };
1026 1014
1027 1015 /**
1028 1016 * Dispatch IOPub messages to respective handlers. Each message
1029 1017 * type should have a handler.
1030 1018 *
1031 1019 * @function _handle_iopub_message
1032 1020 */
1033 Kernel.prototype._handle_iopub_message = function (e) {
1034 serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this));
1035 };
1036
1037
1038 Kernel.prototype._finish_iopub_message = function (msg) {
1021 Kernel.prototype._handle_iopub_message = function (msg) {
1039 1022 var handler = this.get_iopub_handler(msg.header.msg_type);
1040 1023 if (handler !== undefined) {
1041 1024 handler(msg);
1042 1025 }
1043 1026 };
1044 1027
1045 1028 /**
1046 1029 * @function _handle_input_request
1047 1030 */
1048 Kernel.prototype._handle_input_request = function (e) {
1049 serialize.deserialize(e.data, $.proxy(this._finish_input_request, this));
1050 };
1051
1052
1053 Kernel.prototype._finish_input_request = function (request) {
1031 Kernel.prototype._handle_input_request = function (request) {
1054 1032 var header = request.header;
1055 1033 var content = request.content;
1056 1034 var metadata = request.metadata;
1057 1035 var msg_type = header.msg_type;
1058 1036 if (msg_type !== 'input_request') {
1059 1037 console.log("Invalid input request!", request);
1060 1038 return;
1061 1039 }
1062 1040 var callbacks = this.get_callbacks_for_msg(request.parent_header.msg_id);
1063 1041 if (callbacks) {
1064 1042 if (callbacks.input) {
1065 1043 callbacks.input(request);
1066 1044 }
1067 1045 }
1068 1046 };
1069 1047
1070 1048 // Backwards compatability.
1071 1049 IPython.Kernel = Kernel;
1072 1050
1073 1051 return {'Kernel': Kernel};
1074 1052 });
General Comments 0
You need to be logged in to leave comments. Login now