##// END OF EJS Templates
Fix race condition in javascript kernel message processing...
Jason Grout -
Show More
@@ -1,1052 +1,1056 b''
1 1 // Copyright (c) IPython Development Team.
2 2 // Distributed under the terms of the Modified BSD License.
3 3
4 4 define([
5 5 'base/js/namespace',
6 6 'jquery',
7 7 'base/js/utils',
8 8 './comm',
9 9 './serialize',
10 10 'widgets/js/init'
11 11 ], function(IPython, $, utils, comm, serialize, widgetmanager) {
12 12 "use strict";
13 13
14 14 /**
15 15 * A Kernel class to communicate with the Python kernel. This
16 16 * should generally not be constructed directly, but be created
17 17 * by. the `Session` object. Once created, this object should be
18 18 * used to communicate with the kernel.
19 19 *
20 20 * @class Kernel
21 21 * @param {string} kernel_service_url - the URL to access the kernel REST api
22 22 * @param {string} ws_url - the websockets URL
23 23 * @param {Notebook} notebook - notebook object
24 24 * @param {string} name - the kernel type (e.g. python3)
25 25 */
26 26 var Kernel = function (kernel_service_url, ws_url, notebook, name) {
27 27 this.events = notebook.events;
28 28
29 29 this.id = null;
30 30 this.name = name;
31 31 this.ws = null;
32 32
33 33 this.kernel_service_url = kernel_service_url;
34 34 this.kernel_url = null;
35 35 this.ws_url = ws_url || IPython.utils.get_body_data("wsUrl");
36 36 if (!this.ws_url) {
37 37 // trailing 's' in https will become wss for secure web sockets
38 38 this.ws_url = location.protocol.replace('http', 'ws') + "//" + location.host;
39 39 }
40 40
41 41 this.username = "username";
42 42 this.session_id = utils.uuid();
43 43 this._msg_callbacks = {};
44 this._msg_queue = Promise.resolve();
44 45 this.info_reply = {}; // kernel_info_reply stored here after starting
45 46
46 47 if (typeof(WebSocket) !== 'undefined') {
47 48 this.WebSocket = WebSocket;
48 49 } else if (typeof(MozWebSocket) !== 'undefined') {
49 50 this.WebSocket = MozWebSocket;
50 51 } else {
51 52 alert('Your browser does not have WebSocket support, please try Chrome, Safari or Firefox β‰₯ 6. Firefox 4 and 5 are also supported by you have to enable WebSockets in about:config.');
52 53 }
53 54
54 55 this.bind_events();
55 56 this.init_iopub_handlers();
56 57 this.comm_manager = new comm.CommManager(this);
57 58 this.widget_manager = new widgetmanager.WidgetManager(this.comm_manager, notebook);
58 59
59 60 this.last_msg_id = null;
60 61 this.last_msg_callbacks = {};
61 62
62 63 this._autorestart_attempt = 0;
63 64 this._reconnect_attempt = 0;
64 65 this.reconnect_limit = 7;
65 66 };
66 67
67 68 /**
68 69 * @function _get_msg
69 70 */
70 71 Kernel.prototype._get_msg = function (msg_type, content, metadata, buffers) {
71 72 var msg = {
72 73 header : {
73 74 msg_id : utils.uuid(),
74 75 username : this.username,
75 76 session : this.session_id,
76 77 msg_type : msg_type,
77 78 version : "5.0"
78 79 },
79 80 metadata : metadata || {},
80 81 content : content,
81 82 buffers : buffers || [],
82 83 parent_header : {}
83 84 };
84 85 return msg;
85 86 };
86 87
87 88 /**
88 89 * @function bind_events
89 90 */
90 91 Kernel.prototype.bind_events = function () {
91 92 var that = this;
92 93 this.events.on('send_input_reply.Kernel', function(evt, data) {
93 94 that.send_input_reply(data);
94 95 });
95 96
96 97 var record_status = function (evt, info) {
97 98 console.log('Kernel: ' + evt.type + ' (' + info.kernel.id + ')');
98 99 };
99 100
100 101 this.events.on('kernel_created.Kernel', record_status);
101 102 this.events.on('kernel_reconnecting.Kernel', record_status);
102 103 this.events.on('kernel_connected.Kernel', record_status);
103 104 this.events.on('kernel_starting.Kernel', record_status);
104 105 this.events.on('kernel_restarting.Kernel', record_status);
105 106 this.events.on('kernel_autorestarting.Kernel', record_status);
106 107 this.events.on('kernel_interrupting.Kernel', record_status);
107 108 this.events.on('kernel_disconnected.Kernel', record_status);
108 109 // these are commented out because they are triggered a lot, but can
109 110 // be uncommented for debugging purposes
110 111 //this.events.on('kernel_idle.Kernel', record_status);
111 112 //this.events.on('kernel_busy.Kernel', record_status);
112 113 this.events.on('kernel_ready.Kernel', record_status);
113 114 this.events.on('kernel_killed.Kernel', record_status);
114 115 this.events.on('kernel_dead.Kernel', record_status);
115 116
116 117 this.events.on('kernel_ready.Kernel', function () {
117 118 that._autorestart_attempt = 0;
118 119 });
119 120 this.events.on('kernel_connected.Kernel', function () {
120 121 that._reconnect_attempt = 0;
121 122 });
122 123 };
123 124
124 125 /**
125 126 * Initialize the iopub handlers.
126 127 *
127 128 * @function init_iopub_handlers
128 129 */
129 130 Kernel.prototype.init_iopub_handlers = function () {
130 131 var output_msg_types = ['stream', 'display_data', 'execute_result', 'error'];
131 132 this._iopub_handlers = {};
132 133 this.register_iopub_handler('status', $.proxy(this._handle_status_message, this));
133 134 this.register_iopub_handler('clear_output', $.proxy(this._handle_clear_output, this));
134 135 this.register_iopub_handler('execute_input', $.proxy(this._handle_input_message, this));
135 136
136 137 for (var i=0; i < output_msg_types.length; i++) {
137 138 this.register_iopub_handler(output_msg_types[i], $.proxy(this._handle_output_message, this));
138 139 }
139 140 };
140 141
141 142 /**
142 143 * GET /api/kernels
143 144 *
144 145 * Get the list of running kernels.
145 146 *
146 147 * @function list
147 148 * @param {function} [success] - function executed on ajax success
148 149 * @param {function} [error] - functon executed on ajax error
149 150 */
150 151 Kernel.prototype.list = function (success, error) {
151 152 $.ajax(this.kernel_service_url, {
152 153 processData: false,
153 154 cache: false,
154 155 type: "GET",
155 156 dataType: "json",
156 157 success: success,
157 158 error: this._on_error(error)
158 159 });
159 160 };
160 161
161 162 /**
162 163 * POST /api/kernels
163 164 *
164 165 * Start a new kernel.
165 166 *
166 167 * In general this shouldn't be used -- the kernel should be
167 168 * started through the session API. If you use this function and
168 169 * are also using the session API then your session and kernel
169 170 * WILL be out of sync!
170 171 *
171 172 * @function start
172 173 * @param {params} [Object] - parameters to include in the query string
173 174 * @param {function} [success] - function executed on ajax success
174 175 * @param {function} [error] - functon executed on ajax error
175 176 */
176 177 Kernel.prototype.start = function (params, success, error) {
177 178 var url = this.kernel_service_url;
178 179 var qs = $.param(params || {}); // query string for sage math stuff
179 180 if (qs !== "") {
180 181 url = url + "?" + qs;
181 182 }
182 183
183 184 this.events.trigger('kernel_starting.Kernel', {kernel: this});
184 185 var that = this;
185 186 var on_success = function (data, status, xhr) {
186 187 that.events.trigger('kernel_created.Kernel', {kernel: that});
187 188 that._kernel_created(data);
188 189 if (success) {
189 190 success(data, status, xhr);
190 191 }
191 192 };
192 193
193 194 $.ajax(url, {
194 195 processData: false,
195 196 cache: false,
196 197 type: "POST",
197 198 data: JSON.stringify({name: this.name}),
198 199 dataType: "json",
199 200 success: this._on_success(on_success),
200 201 error: this._on_error(error)
201 202 });
202 203
203 204 return url;
204 205 };
205 206
206 207 /**
207 208 * GET /api/kernels/[:kernel_id]
208 209 *
209 210 * Get information about the kernel.
210 211 *
211 212 * @function get_info
212 213 * @param {function} [success] - function executed on ajax success
213 214 * @param {function} [error] - functon executed on ajax error
214 215 */
215 216 Kernel.prototype.get_info = function (success, error) {
216 217 $.ajax(this.kernel_url, {
217 218 processData: false,
218 219 cache: false,
219 220 type: "GET",
220 221 dataType: "json",
221 222 success: this._on_success(success),
222 223 error: this._on_error(error)
223 224 });
224 225 };
225 226
226 227 /**
227 228 * DELETE /api/kernels/[:kernel_id]
228 229 *
229 230 * Shutdown the kernel.
230 231 *
231 232 * If you are also using sessions, then this function shoul NOT be
232 233 * used. Instead, use Session.delete. Otherwise, the session and
233 234 * kernel WILL be out of sync.
234 235 *
235 236 * @function kill
236 237 * @param {function} [success] - function executed on ajax success
237 238 * @param {function} [error] - functon executed on ajax error
238 239 */
239 240 Kernel.prototype.kill = function (success, error) {
240 241 this.events.trigger('kernel_killed.Kernel', {kernel: this});
241 242 this._kernel_dead();
242 243 $.ajax(this.kernel_url, {
243 244 processData: false,
244 245 cache: false,
245 246 type: "DELETE",
246 247 dataType: "json",
247 248 success: this._on_success(success),
248 249 error: this._on_error(error)
249 250 });
250 251 };
251 252
252 253 /**
253 254 * POST /api/kernels/[:kernel_id]/interrupt
254 255 *
255 256 * Interrupt the kernel.
256 257 *
257 258 * @function interrupt
258 259 * @param {function} [success] - function executed on ajax success
259 260 * @param {function} [error] - functon executed on ajax error
260 261 */
261 262 Kernel.prototype.interrupt = function (success, error) {
262 263 this.events.trigger('kernel_interrupting.Kernel', {kernel: this});
263 264
264 265 var that = this;
265 266 var on_success = function (data, status, xhr) {
266 267 /**
267 268 * get kernel info so we know what state the kernel is in
268 269 */
269 270 that.kernel_info();
270 271 if (success) {
271 272 success(data, status, xhr);
272 273 }
273 274 };
274 275
275 276 var url = utils.url_join_encode(this.kernel_url, 'interrupt');
276 277 $.ajax(url, {
277 278 processData: false,
278 279 cache: false,
279 280 type: "POST",
280 281 dataType: "json",
281 282 success: this._on_success(on_success),
282 283 error: this._on_error(error)
283 284 });
284 285 };
285 286
286 287 Kernel.prototype.restart = function (success, error) {
287 288 /**
288 289 * POST /api/kernels/[:kernel_id]/restart
289 290 *
290 291 * Restart the kernel.
291 292 *
292 293 * @function interrupt
293 294 * @param {function} [success] - function executed on ajax success
294 295 * @param {function} [error] - functon executed on ajax error
295 296 */
296 297 this.events.trigger('kernel_restarting.Kernel', {kernel: this});
297 298 this.stop_channels();
298 299
299 300 var that = this;
300 301 var on_success = function (data, status, xhr) {
301 302 that.events.trigger('kernel_created.Kernel', {kernel: that});
302 303 that._kernel_created(data);
303 304 if (success) {
304 305 success(data, status, xhr);
305 306 }
306 307 };
307 308
308 309 var on_error = function (xhr, status, err) {
309 310 that.events.trigger('kernel_dead.Kernel', {kernel: that});
310 311 that._kernel_dead();
311 312 if (error) {
312 313 error(xhr, status, err);
313 314 }
314 315 };
315 316
316 317 var url = utils.url_join_encode(this.kernel_url, 'restart');
317 318 $.ajax(url, {
318 319 processData: false,
319 320 cache: false,
320 321 type: "POST",
321 322 dataType: "json",
322 323 success: this._on_success(on_success),
323 324 error: this._on_error(on_error)
324 325 });
325 326 };
326 327
327 328 Kernel.prototype.reconnect = function () {
328 329 /**
329 330 * Reconnect to a disconnected kernel. This is not actually a
330 331 * standard HTTP request, but useful function nonetheless for
331 332 * reconnecting to the kernel if the connection is somehow lost.
332 333 *
333 334 * @function reconnect
334 335 */
335 336 if (this.is_connected()) {
336 337 return;
337 338 }
338 339 this._reconnect_attempt = this._reconnect_attempt + 1;
339 340 this.events.trigger('kernel_reconnecting.Kernel', {
340 341 kernel: this,
341 342 attempt: this._reconnect_attempt,
342 343 });
343 344 this.start_channels();
344 345 };
345 346
346 347 Kernel.prototype._on_success = function (success) {
347 348 /**
348 349 * Handle a successful AJAX request by updating the kernel id and
349 350 * name from the response, and then optionally calling a provided
350 351 * callback.
351 352 *
352 353 * @function _on_success
353 354 * @param {function} success - callback
354 355 */
355 356 var that = this;
356 357 return function (data, status, xhr) {
357 358 if (data) {
358 359 that.id = data.id;
359 360 that.name = data.name;
360 361 }
361 362 that.kernel_url = utils.url_join_encode(that.kernel_service_url, that.id);
362 363 if (success) {
363 364 success(data, status, xhr);
364 365 }
365 366 };
366 367 };
367 368
368 369 Kernel.prototype._on_error = function (error) {
369 370 /**
370 371 * Handle a failed AJAX request by logging the error message, and
371 372 * then optionally calling a provided callback.
372 373 *
373 374 * @function _on_error
374 375 * @param {function} error - callback
375 376 */
376 377 return function (xhr, status, err) {
377 378 utils.log_ajax_error(xhr, status, err);
378 379 if (error) {
379 380 error(xhr, status, err);
380 381 }
381 382 };
382 383 };
383 384
384 385 Kernel.prototype._kernel_created = function (data) {
385 386 /**
386 387 * Perform necessary tasks once the kernel has been started,
387 388 * including actually connecting to the kernel.
388 389 *
389 390 * @function _kernel_created
390 391 * @param {Object} data - information about the kernel including id
391 392 */
392 393 this.id = data.id;
393 394 this.kernel_url = utils.url_join_encode(this.kernel_service_url, this.id);
394 395 this.start_channels();
395 396 };
396 397
397 398 Kernel.prototype._kernel_connected = function () {
398 399 /**
399 400 * Perform necessary tasks once the connection to the kernel has
400 401 * been established. This includes requesting information about
401 402 * the kernel.
402 403 *
403 404 * @function _kernel_connected
404 405 */
405 406 this.events.trigger('kernel_connected.Kernel', {kernel: this});
406 407 // get kernel info so we know what state the kernel is in
407 408 var that = this;
408 409 this.kernel_info(function (reply) {
409 410 that.info_reply = reply.content;
410 411 that.events.trigger('kernel_ready.Kernel', {kernel: that});
411 412 });
412 413 };
413 414
414 415 Kernel.prototype._kernel_dead = function () {
415 416 /**
416 417 * Perform necessary tasks after the kernel has died. This closing
417 418 * communication channels to the kernel if they are still somehow
418 419 * open.
419 420 *
420 421 * @function _kernel_dead
421 422 */
422 423 this.stop_channels();
423 424 };
424 425
425 426 Kernel.prototype.start_channels = function () {
426 427 /**
427 428 * Start the websocket channels.
428 429 * Will stop and restart them if they already exist.
429 430 *
430 431 * @function start_channels
431 432 */
432 433 var that = this;
433 434 this.stop_channels();
434 435 var ws_host_url = this.ws_url + this.kernel_url;
435 436
436 437 console.log("Starting WebSockets:", ws_host_url);
437 438
438 439 this.ws = new this.WebSocket([
439 440 that.ws_url,
440 441 utils.url_join_encode(that.kernel_url, 'channels'),
441 442 "?session_id=" + that.session_id
442 443 ].join('')
443 444 );
444 445
445 446 var already_called_onclose = false; // only alert once
446 447 var ws_closed_early = function(evt){
447 448 if (already_called_onclose){
448 449 return;
449 450 }
450 451 already_called_onclose = true;
451 452 if ( ! evt.wasClean ){
452 453 // If the websocket was closed early, that could mean
453 454 // that the kernel is actually dead. Try getting
454 455 // information about the kernel from the API call --
455 456 // if that fails, then assume the kernel is dead,
456 457 // otherwise just follow the typical websocket closed
457 458 // protocol.
458 459 that.get_info(function () {
459 460 that._ws_closed(ws_host_url, false);
460 461 }, function () {
461 462 that.events.trigger('kernel_dead.Kernel', {kernel: that});
462 463 that._kernel_dead();
463 464 });
464 465 }
465 466 };
466 467 var ws_closed_late = function(evt){
467 468 if (already_called_onclose){
468 469 return;
469 470 }
470 471 already_called_onclose = true;
471 472 if ( ! evt.wasClean ){
472 473 that._ws_closed(ws_host_url, false);
473 474 }
474 475 };
475 476 var ws_error = function(evt){
476 477 if (already_called_onclose){
477 478 return;
478 479 }
479 480 already_called_onclose = true;
480 481 that._ws_closed(ws_host_url, true);
481 482 };
482 483
483 484 this.ws.onopen = $.proxy(this._ws_opened, this);
484 485 this.ws.onclose = ws_closed_early;
485 486 this.ws.onerror = ws_error;
486 487 // switch from early-close to late-close message after 1s
487 488 setTimeout(function() {
488 489 if (that.ws !== null) {
489 490 that.ws.onclose = ws_closed_late;
490 491 }
491 492 }, 1000);
492 493 this.ws.onmessage = $.proxy(this._handle_ws_message, this);
493 494 };
494 495
495 496 Kernel.prototype._ws_opened = function (evt) {
496 497 /**
497 498 * Handle a websocket entering the open state,
498 499 * signaling that the kernel is connected when websocket is open.
499 500 *
500 501 * @function _ws_opened
501 502 */
502 503 if (this.is_connected()) {
503 504 // all events ready, trigger started event.
504 505 this._kernel_connected();
505 506 }
506 507 };
507 508
508 509 Kernel.prototype._ws_closed = function(ws_url, error) {
509 510 /**
510 511 * Handle a websocket entering the closed state. If the websocket
511 512 * was not closed due to an error, try to reconnect to the kernel.
512 513 *
513 514 * @function _ws_closed
514 515 * @param {string} ws_url - the websocket url
515 516 * @param {bool} error - whether the connection was closed due to an error
516 517 */
517 518 this.stop_channels();
518 519
519 520 this.events.trigger('kernel_disconnected.Kernel', {kernel: this});
520 521 if (error) {
521 522 console.log('WebSocket connection failed: ', ws_url);
522 523 this.events.trigger('kernel_connection_failed.Kernel', {kernel: this, ws_url: ws_url, attempt: this._reconnect_attempt});
523 524 }
524 525 this._schedule_reconnect();
525 526 };
526 527
527 528 Kernel.prototype._schedule_reconnect = function () {
528 529 /**
529 530 * function to call when kernel connection is lost
530 531 * schedules reconnect, or fires 'connection_dead' if reconnect limit is hit
531 532 */
532 533 if (this._reconnect_attempt < this.reconnect_limit) {
533 534 var timeout = Math.pow(2, this._reconnect_attempt);
534 535 console.log("Connection lost, reconnecting in " + timeout + " seconds.");
535 536 setTimeout($.proxy(this.reconnect, this), 1e3 * timeout);
536 537 } else {
537 538 this.events.trigger('kernel_connection_dead.Kernel', {
538 539 kernel: this,
539 540 reconnect_attempt: this._reconnect_attempt,
540 541 });
541 542 console.log("Failed to reconnect, giving up.");
542 543 }
543 544 };
544 545
545 546 Kernel.prototype.stop_channels = function () {
546 547 /**
547 548 * Close the websocket. After successful close, the value
548 549 * in `this.ws` will be null.
549 550 *
550 551 * @function stop_channels
551 552 */
552 553 var that = this;
553 554 var close = function () {
554 555 if (that.ws && that.ws.readyState === WebSocket.CLOSED) {
555 556 that.ws = null;
556 557 }
557 558 };
558 559 if (this.ws !== null) {
559 560 if (this.ws.readyState === WebSocket.OPEN) {
560 561 this.ws.onclose = close;
561 562 this.ws.close();
562 563 } else {
563 564 close();
564 565 }
565 566 }
566 567 };
567 568
568 569 Kernel.prototype.is_connected = function () {
569 570 /**
570 571 * Check whether there is a connection to the kernel. This
571 572 * function only returns true if websocket has been
572 573 * created and has a state of WebSocket.OPEN.
573 574 *
574 575 * @function is_connected
575 576 * @returns {bool} - whether there is a connection
576 577 */
577 578 // if any channel is not ready, then we're not connected
578 579 if (this.ws === null) {
579 580 return false;
580 581 }
581 582 if (this.ws.readyState !== WebSocket.OPEN) {
582 583 return false;
583 584 }
584 585 return true;
585 586 };
586 587
587 588 Kernel.prototype.is_fully_disconnected = function () {
588 589 /**
589 590 * Check whether the connection to the kernel has been completely
590 591 * severed. This function only returns true if all channel objects
591 592 * are null.
592 593 *
593 594 * @function is_fully_disconnected
594 595 * @returns {bool} - whether the kernel is fully disconnected
595 596 */
596 597 return (this.ws === null);
597 598 };
598 599
599 600 Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) {
600 601 /**
601 602 * Send a message on the Kernel's shell channel
602 603 *
603 604 * @function send_shell_message
604 605 */
605 606 if (!this.is_connected()) {
606 607 throw new Error("kernel is not connected");
607 608 }
608 609 var msg = this._get_msg(msg_type, content, metadata, buffers);
609 610 msg.channel = 'shell';
610 611 this.ws.send(serialize.serialize(msg));
611 612 this.set_callbacks_for_msg(msg.header.msg_id, callbacks);
612 613 return msg.header.msg_id;
613 614 };
614 615
615 616 Kernel.prototype.kernel_info = function (callback) {
616 617 /**
617 618 * Get kernel info
618 619 *
619 620 * @function kernel_info
620 621 * @param callback {function}
621 622 *
622 623 * When calling this method, pass a callback function that expects one argument.
623 624 * The callback will be passed the complete `kernel_info_reply` message documented
624 625 * [here](http://ipython.org/ipython-doc/dev/development/messaging.html#kernel-info)
625 626 */
626 627 var callbacks;
627 628 if (callback) {
628 629 callbacks = { shell : { reply : callback } };
629 630 }
630 631 return this.send_shell_message("kernel_info_request", {}, callbacks);
631 632 };
632 633
633 634 Kernel.prototype.inspect = function (code, cursor_pos, callback) {
634 635 /**
635 636 * Get info on an object
636 637 *
637 638 * When calling this method, pass a callback function that expects one argument.
638 639 * The callback will be passed the complete `inspect_reply` message documented
639 640 * [here](http://ipython.org/ipython-doc/dev/development/messaging.html#object-information)
640 641 *
641 642 * @function inspect
642 643 * @param code {string}
643 644 * @param cursor_pos {integer}
644 645 * @param callback {function}
645 646 */
646 647 var callbacks;
647 648 if (callback) {
648 649 callbacks = { shell : { reply : callback } };
649 650 }
650 651
651 652 var content = {
652 653 code : code,
653 654 cursor_pos : cursor_pos,
654 655 detail_level : 0
655 656 };
656 657 return this.send_shell_message("inspect_request", content, callbacks);
657 658 };
658 659
659 660 Kernel.prototype.execute = function (code, callbacks, options) {
660 661 /**
661 662 * Execute given code into kernel, and pass result to callback.
662 663 *
663 664 * @async
664 665 * @function execute
665 666 * @param {string} code
666 667 * @param [callbacks] {Object} With the following keys (all optional)
667 668 * @param callbacks.shell.reply {function}
668 669 * @param callbacks.shell.payload.[payload_name] {function}
669 670 * @param callbacks.iopub.output {function}
670 671 * @param callbacks.iopub.clear_output {function}
671 672 * @param callbacks.input {function}
672 673 * @param {object} [options]
673 674 * @param [options.silent=false] {Boolean}
674 675 * @param [options.user_expressions=empty_dict] {Dict}
675 676 * @param [options.allow_stdin=false] {Boolean} true|false
676 677 *
677 678 * @example
678 679 *
679 680 * The options object should contain the options for the execute
680 681 * call. Its default values are:
681 682 *
682 683 * options = {
683 684 * silent : true,
684 685 * user_expressions : {},
685 686 * allow_stdin : false
686 687 * }
687 688 *
688 689 * When calling this method pass a callbacks structure of the
689 690 * form:
690 691 *
691 692 * callbacks = {
692 693 * shell : {
693 694 * reply : execute_reply_callback,
694 695 * payload : {
695 696 * set_next_input : set_next_input_callback,
696 697 * }
697 698 * },
698 699 * iopub : {
699 700 * output : output_callback,
700 701 * clear_output : clear_output_callback,
701 702 * },
702 703 * input : raw_input_callback
703 704 * }
704 705 *
705 706 * Each callback will be passed the entire message as a single
706 707 * arugment. Payload handlers will be passed the corresponding
707 708 * payload and the execute_reply message.
708 709 */
709 710 var content = {
710 711 code : code,
711 712 silent : true,
712 713 store_history : false,
713 714 user_expressions : {},
714 715 allow_stdin : false
715 716 };
716 717 callbacks = callbacks || {};
717 718 if (callbacks.input !== undefined) {
718 719 content.allow_stdin = true;
719 720 }
720 721 $.extend(true, content, options);
721 722 this.events.trigger('execution_request.Kernel', {kernel: this, content: content});
722 723 return this.send_shell_message("execute_request", content, callbacks);
723 724 };
724 725
725 726 /**
726 727 * When calling this method, pass a function to be called with the
727 728 * `complete_reply` message as its only argument when it arrives.
728 729 *
729 730 * `complete_reply` is documented
730 731 * [here](http://ipython.org/ipython-doc/dev/development/messaging.html#complete)
731 732 *
732 733 * @function complete
733 734 * @param code {string}
734 735 * @param cursor_pos {integer}
735 736 * @param callback {function}
736 737 */
737 738 Kernel.prototype.complete = function (code, cursor_pos, callback) {
738 739 var callbacks;
739 740 if (callback) {
740 741 callbacks = { shell : { reply : callback } };
741 742 }
742 743 var content = {
743 744 code : code,
744 745 cursor_pos : cursor_pos
745 746 };
746 747 return this.send_shell_message("complete_request", content, callbacks);
747 748 };
748 749
749 750 /**
750 751 * @function send_input_reply
751 752 */
752 753 Kernel.prototype.send_input_reply = function (input) {
753 754 if (!this.is_connected()) {
754 755 throw new Error("kernel is not connected");
755 756 }
756 757 var content = {
757 758 value : input
758 759 };
759 760 this.events.trigger('input_reply.Kernel', {kernel: this, content: content});
760 761 var msg = this._get_msg("input_reply", content);
761 762 msg.channel = 'stdin';
762 763 this.ws.send(serialize.serialize(msg));
763 764 return msg.header.msg_id;
764 765 };
765 766
766 767 /**
767 768 * @function register_iopub_handler
768 769 */
769 770 Kernel.prototype.register_iopub_handler = function (msg_type, callback) {
770 771 this._iopub_handlers[msg_type] = callback;
771 772 };
772 773
773 774 /**
774 775 * Get the iopub handler for a specific message type.
775 776 *
776 777 * @function get_iopub_handler
777 778 */
778 779 Kernel.prototype.get_iopub_handler = function (msg_type) {
779 780 return this._iopub_handlers[msg_type];
780 781 };
781 782
782 783 /**
783 784 * Get callbacks for a specific message.
784 785 *
785 786 * @function get_callbacks_for_msg
786 787 */
787 788 Kernel.prototype.get_callbacks_for_msg = function (msg_id) {
788 789 if (msg_id == this.last_msg_id) {
789 790 return this.last_msg_callbacks;
790 791 } else {
791 792 return this._msg_callbacks[msg_id];
792 793 }
793 794 };
794 795
795 796 /**
796 797 * Clear callbacks for a specific message.
797 798 *
798 799 * @function clear_callbacks_for_msg
799 800 */
800 801 Kernel.prototype.clear_callbacks_for_msg = function (msg_id) {
801 802 if (this._msg_callbacks[msg_id] !== undefined ) {
802 803 delete this._msg_callbacks[msg_id];
803 804 }
804 805 };
805 806
806 807 /**
807 808 * @function _finish_shell
808 809 */
809 810 Kernel.prototype._finish_shell = function (msg_id) {
810 811 var callbacks = this._msg_callbacks[msg_id];
811 812 if (callbacks !== undefined) {
812 813 callbacks.shell_done = true;
813 814 if (callbacks.iopub_done) {
814 815 this.clear_callbacks_for_msg(msg_id);
815 816 }
816 817 }
817 818 };
818 819
819 820 /**
820 821 * @function _finish_iopub
821 822 */
822 823 Kernel.prototype._finish_iopub = function (msg_id) {
823 824 var callbacks = this._msg_callbacks[msg_id];
824 825 if (callbacks !== undefined) {
825 826 callbacks.iopub_done = true;
826 827 if (callbacks.shell_done) {
827 828 this.clear_callbacks_for_msg(msg_id);
828 829 }
829 830 }
830 831 };
831 832
832 833 /**
833 834 * Set callbacks for a particular message.
834 835 * Callbacks should be a struct of the following form:
835 836 * shell : {
836 837 *
837 838 * }
838 839 *
839 840 * @function set_callbacks_for_msg
840 841 */
841 842 Kernel.prototype.set_callbacks_for_msg = function (msg_id, callbacks) {
842 843 this.last_msg_id = msg_id;
843 844 if (callbacks) {
844 845 // shallow-copy mapping, because we will modify it at the top level
845 846 var cbcopy = this._msg_callbacks[msg_id] = this.last_msg_callbacks = {};
846 847 cbcopy.shell = callbacks.shell;
847 848 cbcopy.iopub = callbacks.iopub;
848 849 cbcopy.input = callbacks.input;
849 850 cbcopy.shell_done = (!callbacks.shell);
850 851 cbcopy.iopub_done = (!callbacks.iopub);
851 852 } else {
852 853 this.last_msg_callbacks = {};
853 854 }
854 855 };
855 856
856 857 Kernel.prototype._handle_ws_message = function (e) {
857 serialize.deserialize(e.data, $.proxy(this._finish_ws_message, this));
858 this._msg_queue = this._msg_queue.then(function() {
859 return serialize.deserialize(e.data);
860 }).then($.proxy(this._finish_ws_message, this))
861 .catch(utils.reject("Couldn't process kernel message", true));
858 862 };
859 863
860 864 Kernel.prototype._finish_ws_message = function (msg) {
861 865 switch (msg.channel) {
862 866 case 'shell':
863 867 this._handle_shell_reply(msg);
864 868 break;
865 869 case 'iopub':
866 870 this._handle_iopub_message(msg);
867 871 break;
868 872 case 'stdin':
869 873 this._handle_input_request(msg);
870 874 break;
871 875 default:
872 876 console.error("unrecognized message channel", msg.channel, msg);
873 877 }
874 878 };
875 879
876 880 Kernel.prototype._handle_shell_reply = function (reply) {
877 881 this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply});
878 882 var content = reply.content;
879 883 var metadata = reply.metadata;
880 884 var parent_id = reply.parent_header.msg_id;
881 885 var callbacks = this.get_callbacks_for_msg(parent_id);
882 886 if (!callbacks || !callbacks.shell) {
883 887 return;
884 888 }
885 889 var shell_callbacks = callbacks.shell;
886 890
887 891 // signal that shell callbacks are done
888 892 this._finish_shell(parent_id);
889 893
890 894 if (shell_callbacks.reply !== undefined) {
891 895 shell_callbacks.reply(reply);
892 896 }
893 897 if (content.payload && shell_callbacks.payload) {
894 898 this._handle_payloads(content.payload, shell_callbacks.payload, reply);
895 899 }
896 900 };
897 901
898 902 /**
899 903 * @function _handle_payloads
900 904 */
901 905 Kernel.prototype._handle_payloads = function (payloads, payload_callbacks, msg) {
902 906 var l = payloads.length;
903 907 // Payloads are handled by triggering events because we don't want the Kernel
904 908 // to depend on the Notebook or Pager classes.
905 909 for (var i=0; i<l; i++) {
906 910 var payload = payloads[i];
907 911 var callback = payload_callbacks[payload.source];
908 912 if (callback) {
909 913 callback(payload, msg);
910 914 }
911 915 }
912 916 };
913 917
914 918 /**
915 919 * @function _handle_status_message
916 920 */
917 921 Kernel.prototype._handle_status_message = function (msg) {
918 922 var execution_state = msg.content.execution_state;
919 923 var parent_id = msg.parent_header.msg_id;
920 924
921 925 // dispatch status msg callbacks, if any
922 926 var callbacks = this.get_callbacks_for_msg(parent_id);
923 927 if (callbacks && callbacks.iopub && callbacks.iopub.status) {
924 928 try {
925 929 callbacks.iopub.status(msg);
926 930 } catch (e) {
927 931 console.log("Exception in status msg handler", e, e.stack);
928 932 }
929 933 }
930 934
931 935 if (execution_state === 'busy') {
932 936 this.events.trigger('kernel_busy.Kernel', {kernel: this});
933 937
934 938 } else if (execution_state === 'idle') {
935 939 // signal that iopub callbacks are (probably) done
936 940 // async output may still arrive,
937 941 // but only for the most recent request
938 942 this._finish_iopub(parent_id);
939 943
940 944 // trigger status_idle event
941 945 this.events.trigger('kernel_idle.Kernel', {kernel: this});
942 946
943 947 } else if (execution_state === 'starting') {
944 948 this.events.trigger('kernel_starting.Kernel', {kernel: this});
945 949 var that = this;
946 950 this.kernel_info(function (reply) {
947 951 that.info_reply = reply.content;
948 952 that.events.trigger('kernel_ready.Kernel', {kernel: that});
949 953 });
950 954
951 955 } else if (execution_state === 'restarting') {
952 956 // autorestarting is distinct from restarting,
953 957 // in that it means the kernel died and the server is restarting it.
954 958 // kernel_restarting sets the notification widget,
955 959 // autorestart shows the more prominent dialog.
956 960 this._autorestart_attempt = this._autorestart_attempt + 1;
957 961 this.events.trigger('kernel_restarting.Kernel', {kernel: this});
958 962 this.events.trigger('kernel_autorestarting.Kernel', {kernel: this, attempt: this._autorestart_attempt});
959 963
960 964 } else if (execution_state === 'dead') {
961 965 this.events.trigger('kernel_dead.Kernel', {kernel: this});
962 966 this._kernel_dead();
963 967 }
964 968 };
965 969
966 970 /**
967 971 * Handle clear_output message
968 972 *
969 973 * @function _handle_clear_output
970 974 */
971 975 Kernel.prototype._handle_clear_output = function (msg) {
972 976 var callbacks = this.get_callbacks_for_msg(msg.parent_header.msg_id);
973 977 if (!callbacks || !callbacks.iopub) {
974 978 return;
975 979 }
976 980 var callback = callbacks.iopub.clear_output;
977 981 if (callback) {
978 982 callback(msg);
979 983 }
980 984 };
981 985
982 986 /**
983 987 * handle an output message (execute_result, display_data, etc.)
984 988 *
985 989 * @function _handle_output_message
986 990 */
987 991 Kernel.prototype._handle_output_message = function (msg) {
988 992 var callbacks = this.get_callbacks_for_msg(msg.parent_header.msg_id);
989 993 if (!callbacks || !callbacks.iopub) {
990 994 // The message came from another client. Let the UI decide what to
991 995 // do with it.
992 996 this.events.trigger('received_unsolicited_message.Kernel', msg);
993 997 return;
994 998 }
995 999 var callback = callbacks.iopub.output;
996 1000 if (callback) {
997 1001 callback(msg);
998 1002 }
999 1003 };
1000 1004
1001 1005 /**
1002 1006 * Handle an input message (execute_input).
1003 1007 *
1004 1008 * @function _handle_input message
1005 1009 */
1006 1010 Kernel.prototype._handle_input_message = function (msg) {
1007 1011 var callbacks = this.get_callbacks_for_msg(msg.parent_header.msg_id);
1008 1012 if (!callbacks) {
1009 1013 // The message came from another client. Let the UI decide what to
1010 1014 // do with it.
1011 1015 this.events.trigger('received_unsolicited_message.Kernel', msg);
1012 1016 }
1013 1017 };
1014 1018
1015 1019 /**
1016 1020 * Dispatch IOPub messages to respective handlers. Each message
1017 1021 * type should have a handler.
1018 1022 *
1019 1023 * @function _handle_iopub_message
1020 1024 */
1021 1025 Kernel.prototype._handle_iopub_message = function (msg) {
1022 1026 var handler = this.get_iopub_handler(msg.header.msg_type);
1023 1027 if (handler !== undefined) {
1024 1028 handler(msg);
1025 1029 }
1026 1030 };
1027 1031
1028 1032 /**
1029 1033 * @function _handle_input_request
1030 1034 */
1031 1035 Kernel.prototype._handle_input_request = function (request) {
1032 1036 var header = request.header;
1033 1037 var content = request.content;
1034 1038 var metadata = request.metadata;
1035 1039 var msg_type = header.msg_type;
1036 1040 if (msg_type !== 'input_request') {
1037 1041 console.log("Invalid input request!", request);
1038 1042 return;
1039 1043 }
1040 1044 var callbacks = this.get_callbacks_for_msg(request.parent_header.msg_id);
1041 1045 if (callbacks) {
1042 1046 if (callbacks.input) {
1043 1047 callbacks.input(request);
1044 1048 }
1045 1049 }
1046 1050 };
1047 1051
1048 1052 // Backwards compatability.
1049 1053 IPython.Kernel = Kernel;
1050 1054
1051 1055 return {'Kernel': Kernel};
1052 1056 });
@@ -1,120 +1,123 b''
1 1 // Copyright (c) IPython Development Team.
2 2 // Distributed under the terms of the Modified BSD License.
3 3
4 4 define([
5 5 'underscore',
6 6 ], function (_) {
7 7 "use strict";
8 8
9 9 var _deserialize_array_buffer = function (buf) {
10 10 var data = new DataView(buf);
11 11 // read the header: 1 + nbufs 32b integers
12 12 var nbufs = data.getUint32(0);
13 13 var offsets = [];
14 14 var i;
15 15 for (i = 1; i <= nbufs; i++) {
16 16 offsets.push(data.getUint32(i * 4));
17 17 }
18 18 var json_bytes = new Uint8Array(buf.slice(offsets[0], offsets[1]));
19 19 var msg = JSON.parse(
20 20 (new TextDecoder('utf8')).decode(json_bytes)
21 21 );
22 22 // the remaining chunks are stored as DataViews in msg.buffers
23 23 msg.buffers = [];
24 24 var start, stop;
25 25 for (i = 1; i < nbufs; i++) {
26 26 start = offsets[i];
27 27 stop = offsets[i+1] || buf.byteLength;
28 28 msg.buffers.push(new DataView(buf.slice(start, stop)));
29 29 }
30 30 return msg;
31 31 };
32 32
33 var _deserialize_binary = function(data, callback) {
33 var _deserialize_binary = function(data) {
34 34 /**
35 35 * deserialize the binary message format
36 36 * callback will be called with a message whose buffers attribute
37 37 * will be an array of DataViews.
38 38 */
39 39 if (data instanceof Blob) {
40 40 // data is Blob, have to deserialize from ArrayBuffer in reader callback
41 41 var reader = new FileReader();
42 reader.onload = function () {
43 var msg = _deserialize_array_buffer(this.result);
44 callback(msg);
45 };
42 var promise = new Promise(function(resolve, reject) {
43 reader.onload = function () {
44 var msg = _deserialize_array_buffer(this.result);
45 resolve(msg);
46 };
47 });
46 48 reader.readAsArrayBuffer(data);
49 return promise;
47 50 } else {
48 51 // data is ArrayBuffer, can deserialize directly
49 52 var msg = _deserialize_array_buffer(data);
50 callback(msg);
53 return msg;
51 54 }
52 55 };
53 56
54 var deserialize = function (data, callback) {
57 var deserialize = function (data) {
55 58 /**
56 * deserialize a message and pass the unpacked message object to callback
59 * deserialize a message and return a promise for the unpacked message
57 60 */
58 61 if (typeof data === "string") {
59 62 // text JSON message
60 callback(JSON.parse(data));
63 return Promise.resolve(JSON.parse(data));
61 64 } else {
62 65 // binary message
63 _deserialize_binary(data, callback);
66 return Promise.resolve(_deserialize_binary(data));
64 67 }
65 68 };
66 69
67 70 var _serialize_binary = function (msg) {
68 71 /**
69 72 * implement the binary serialization protocol
70 73 * serializes JSON message to ArrayBuffer
71 74 */
72 75 msg = _.clone(msg);
73 76 var offsets = [];
74 77 var buffers = [];
75 78 msg.buffers.map(function (buf) {
76 79 buffers.push(buf);
77 80 });
78 81 delete msg.buffers;
79 82 var json_utf8 = (new TextEncoder('utf8')).encode(JSON.stringify(msg));
80 83 buffers.unshift(json_utf8);
81 84 var nbufs = buffers.length;
82 85 offsets.push(4 * (nbufs + 1));
83 86 var i;
84 87 for (i = 0; i + 1 < buffers.length; i++) {
85 88 offsets.push(offsets[offsets.length-1] + buffers[i].byteLength);
86 89 }
87 90 var msg_buf = new Uint8Array(
88 91 offsets[offsets.length-1] + buffers[buffers.length-1].byteLength
89 92 );
90 93 // use DataView.setUint32 for network byte-order
91 94 var view = new DataView(msg_buf.buffer);
92 95 // write nbufs to first 4 bytes
93 96 view.setUint32(0, nbufs);
94 97 // write offsets to next 4 * nbufs bytes
95 98 for (i = 0; i < offsets.length; i++) {
96 99 view.setUint32(4 * (i+1), offsets[i]);
97 100 }
98 101 // write all the buffers at their respective offsets
99 102 for (i = 0; i < buffers.length; i++) {
100 103 msg_buf.set(new Uint8Array(buffers[i].buffer), offsets[i]);
101 104 }
102 105
103 106 // return raw ArrayBuffer
104 107 return msg_buf.buffer;
105 108 };
106 109
107 110 var serialize = function (msg) {
108 111 if (msg.buffers && msg.buffers.length) {
109 112 return _serialize_binary(msg);
110 113 } else {
111 114 return JSON.stringify(msg);
112 115 }
113 116 };
114 117
115 118 var exports = {
116 119 deserialize : deserialize,
117 120 serialize: serialize
118 121 };
119 122 return exports;
120 }); No newline at end of file
123 });
General Comments 0
You need to be logged in to leave comments. Login now