##// END OF EJS Templates
Merge pull request #6110 from minrk/binarycomm...
Matthias Bussonnier -
r18418:2a8a2c87 merge
parent child Browse files
Show More
@@ -0,0 +1,114 b''
1 // Copyright (c) IPython Development Team.
2 // Distributed under the terms of the Modified BSD License.
3
4 define([
5 'underscore',
6 ], function (_) {
7 "use strict";
8
9 var _deserialize_array_buffer = function (buf) {
10 var data = new DataView(buf);
11 // read the header: 1 + nbufs 32b integers
12 var nbufs = data.getUint32(0);
13 var offsets = [];
14 var i;
15 for (i = 1; i <= nbufs; i++) {
16 offsets.push(data.getUint32(i * 4));
17 }
18 var json_bytes = new Uint8Array(buf.slice(offsets[0], offsets[1]));
19 var msg = JSON.parse(
20 (new TextDecoder('utf8')).decode(json_bytes)
21 );
22 // the remaining chunks are stored as DataViews in msg.buffers
23 msg.buffers = [];
24 var start, stop;
25 for (i = 1; i < nbufs; i++) {
26 start = offsets[i];
27 stop = offsets[i+1] || buf.byteLength;
28 msg.buffers.push(new DataView(buf.slice(start, stop)));
29 }
30 return msg;
31 };
32
33 var _deserialize_binary = function(data, callback) {
34 // deserialize the binary message format
35 // callback will be called with a message whose buffers attribute
36 // will be an array of DataViews.
37 if (data instanceof Blob) {
38 // data is Blob, have to deserialize from ArrayBuffer in reader callback
39 var reader = new FileReader();
40 reader.onload = function () {
41 var msg = _deserialize_array_buffer(this.result);
42 callback(msg);
43 };
44 reader.readAsArrayBuffer(data);
45 } else {
46 // data is ArrayBuffer, can deserialize directly
47 var msg = _deserialize_array_buffer(data);
48 callback(msg);
49 }
50 };
51
52 var deserialize = function (data, callback) {
53 // deserialize a message and pass the unpacked message object to callback
54 if (typeof data === "string") {
55 // text JSON message
56 callback(JSON.parse(data));
57 } else {
58 // binary message
59 _deserialize_binary(data, callback);
60 }
61 };
62
63 var _serialize_binary = function (msg) {
64 // implement the binary serialization protocol
65 // serializes JSON message to ArrayBuffer
66 msg = _.clone(msg);
67 var offsets = [];
68 var buffers = [];
69 msg.buffers.map(function (buf) {
70 buffers.push(buf);
71 });
72 delete msg.buffers;
73 var json_utf8 = (new TextEncoder('utf8')).encode(JSON.stringify(msg));
74 buffers.unshift(json_utf8);
75 var nbufs = buffers.length;
76 offsets.push(4 * (nbufs + 1));
77 var i;
78 for (i = 0; i + 1 < buffers.length; i++) {
79 offsets.push(offsets[offsets.length-1] + buffers[i].byteLength);
80 }
81 var msg_buf = new Uint8Array(
82 offsets[offsets.length-1] + buffers[buffers.length-1].byteLength
83 );
84 // use DataView.setUint32 for network byte-order
85 var view = new DataView(msg_buf.buffer);
86 // write nbufs to first 4 bytes
87 view.setUint32(0, nbufs);
88 // write offsets to next 4 * nbufs bytes
89 for (i = 0; i < offsets.length; i++) {
90 view.setUint32(4 * (i+1), offsets[i]);
91 }
92 // write all the buffers at their respective offsets
93 for (i = 0; i < buffers.length; i++) {
94 msg_buf.set(new Uint8Array(buffers[i].buffer), offsets[i]);
95 }
96
97 // return raw ArrayBuffer
98 return msg_buf.buffer;
99 };
100
101 var serialize = function (msg) {
102 if (msg.buffers && msg.buffers.length) {
103 return _serialize_binary(msg);
104 } else {
105 return JSON.stringify(msg);
106 }
107 };
108
109 var exports = {
110 deserialize : deserialize,
111 serialize: serialize
112 };
113 return exports;
114 }); No newline at end of file
@@ -0,0 +1,113 b''
1 //
2 // Test binary messages on websockets.
3 // Only works on slimer for now, due to old websocket impl in phantomjs.
4 //
5
6 casper.notebook_test(function () {
7 if (!this.slimerjs) {
8 console.log("Can't test binary websockets on phantomjs.");
9 return;
10 }
11 // create EchoBuffers target on js-side.
12 // it just captures and echos comm messages.
13 this.then(function () {
14 var success = this.evaluate(function () {
15 IPython._msgs = [];
16
17 var EchoBuffers = function(comm) {
18 this.comm = comm;
19 this.comm.on_msg($.proxy(this.on_msg, this));
20 };
21
22 EchoBuffers.prototype.on_msg = function (msg) {
23 IPython._msgs.push(msg);
24 this.comm.send(msg.content.data, {}, {}, msg.buffers);
25 };
26
27 IPython.notebook.kernel.comm_manager.register_target("echo", function (comm) {
28 return new EchoBuffers(comm);
29 });
30
31 return true;
32 });
33 this.test.assertEquals(success, true, "Created echo comm target");
34 });
35
36 // Create a similar comm that captures messages Python-side
37 this.then(function () {
38 var index = this.append_cell([
39 "import os",
40 "from IPython.kernel.comm import Comm",
41 "comm = Comm(target_name='echo')",
42 "msgs = []",
43 "def on_msg(msg):",
44 " msgs.append(msg)",
45 "comm.on_msg(on_msg)"
46 ].join('\n'), 'code');
47 this.execute_cell(index);
48 });
49
50 // send a message with binary data
51 this.then(function () {
52 var index = this.append_cell([
53 "buffers = [b'\\xFF\\x00', b'\\x00\\x01\\x02']",
54 "comm.send(data='hi', buffers=buffers)"
55 ].join('\n'), 'code');
56 this.execute_cell(index);
57 });
58
59 // wait for capture
60 this.waitFor(function () {
61 return this.evaluate(function () {
62 return IPython._msgs.length > 0;
63 });
64 });
65
66 // validate captured buffers js-side
67 this.then(function () {
68 var msgs = this.evaluate(function () {
69 return IPython._msgs;
70 });
71 this.test.assertEquals(msgs.length, 1, "Captured comm message");
72 var buffers = msgs[0].buffers;
73 this.test.assertEquals(buffers.length, 2, "comm message has buffers");
74
75 // extract attributes to test in evaluate,
76 // because the raw DataViews can't be passed across
77 var buf_info = function (index) {
78 var buf = IPython._msgs[0].buffers[index];
79 var data = {};
80 data.byteLength = buf.byteLength;
81 data.bytes = [];
82 for (var i = 0; i < data.byteLength; i++) {
83 data.bytes.push(buf.getUint8(i));
84 }
85 return data;
86 };
87
88 buf0 = this.evaluate(buf_info, 0);
89 buf1 = this.evaluate(buf_info, 1);
90 this.test.assertEquals(buf0.byteLength, 2, 'buf[0] has correct size');
91 this.test.assertEquals(buf0.bytes, [255, 0], 'buf[0] has correct bytes');
92 this.test.assertEquals(buf1.byteLength, 3, 'buf[1] has correct size');
93 this.test.assertEquals(buf1.bytes, [0, 1, 2], 'buf[1] has correct bytes');
94 });
95
96 // validate captured buffers Python-side
97 this.then(function () {
98 var index = this.append_cell([
99 "assert len(msgs) == 1, len(msgs)",
100 "bufs = msgs[0]['buffers']",
101 "assert len(bufs) == len(buffers), bufs",
102 "assert bufs[0].bytes == buffers[0], bufs[0].bytes",
103 "assert bufs[1].bytes == buffers[1], bufs[1].bytes",
104 "1",
105 ].join('\n'), 'code');
106 this.execute_cell(index);
107 this.wait_for_output(index);
108 this.then(function () {
109 var out = this.get_output_cell(index);
110 this.test.assertEquals(out['text/plain'], '1', "Python received buffers");
111 });
112 });
113 });
@@ -0,0 +1,26 b''
1 """Test serialize/deserialize messages with buffers"""
2
3 import os
4
5 import nose.tools as nt
6
7 from IPython.kernel.zmq.session import Session
8 from ..base.zmqhandlers import (
9 serialize_binary_message,
10 deserialize_binary_message,
11 )
12
13 def test_serialize_binary():
14 s = Session()
15 msg = s.msg('data_pub', content={'a': 'b'})
16 msg['buffers'] = [ os.urandom(3) for i in range(3) ]
17 bmsg = serialize_binary_message(msg)
18 nt.assert_is_instance(bmsg, bytes)
19
20 def test_deserialize_binary():
21 s = Session()
22 msg = s.msg('data_pub', content={'a': 'b'})
23 msg['buffers'] = [ os.urandom(2) for i in range(3) ]
24 bmsg = serialize_binary_message(msg)
25 msg2 = deserialize_binary_message(bmsg)
26 nt.assert_equal(msg2, msg)
@@ -4,6 +4,7 b''
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import json
6 import json
7 import struct
7
8
8 try:
9 try:
9 from urllib.parse import urlparse # Py 3
10 from urllib.parse import urlparse # Py 3
@@ -22,12 +23,70 b' from tornado import web'
22 from tornado import websocket
23 from tornado import websocket
23
24
24 from IPython.kernel.zmq.session import Session
25 from IPython.kernel.zmq.session import Session
25 from IPython.utils.jsonutil import date_default
26 from IPython.utils.jsonutil import date_default, extract_dates
26 from IPython.utils.py3compat import PY3, cast_unicode
27 from IPython.utils.py3compat import PY3, cast_unicode
27
28
28 from .handlers import IPythonHandler
29 from .handlers import IPythonHandler
29
30
30
31
32 def serialize_binary_message(msg):
33 """serialize a message as a binary blob
34
35 Header:
36
37 4 bytes: number of msg parts (nbufs) as 32b int
38 4 * nbufs bytes: offset for each buffer as integer as 32b int
39
40 Offsets are from the start of the buffer, including the header.
41
42 Returns
43 -------
44
45 The message serialized to bytes.
46
47 """
48 # don't modify msg or buffer list in-place
49 msg = msg.copy()
50 buffers = list(msg.pop('buffers'))
51 bmsg = json.dumps(msg, default=date_default).encode('utf8')
52 buffers.insert(0, bmsg)
53 nbufs = len(buffers)
54 offsets = [4 * (nbufs + 1)]
55 for buf in buffers[:-1]:
56 offsets.append(offsets[-1] + len(buf))
57 offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets)
58 buffers.insert(0, offsets_buf)
59 return b''.join(buffers)
60
61
62 def deserialize_binary_message(bmsg):
63 """deserialize a message from a binary blog
64
65 Header:
66
67 4 bytes: number of msg parts (nbufs) as 32b int
68 4 * nbufs bytes: offset for each buffer as integer as 32b int
69
70 Offsets are from the start of the buffer, including the header.
71
72 Returns
73 -------
74
75 message dictionary
76 """
77 nbufs = struct.unpack('!i', bmsg[:4])[0]
78 offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)]))
79 offsets.append(None)
80 bufs = []
81 for start, stop in zip(offsets[:-1], offsets[1:]):
82 bufs.append(bmsg[start:stop])
83 msg = json.loads(bufs[0].decode('utf8'))
84 msg['header'] = extract_dates(msg['header'])
85 msg['parent_header'] = extract_dates(msg['parent_header'])
86 msg['buffers'] = bufs[1:]
87 return msg
88
89
31 class ZMQStreamHandler(websocket.WebSocketHandler):
90 class ZMQStreamHandler(websocket.WebSocketHandler):
32
91
33 def check_origin(self, origin):
92 def check_origin(self, origin):
@@ -77,23 +136,19 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
77 def _reserialize_reply(self, msg_list):
136 def _reserialize_reply(self, msg_list):
78 """Reserialize a reply message using JSON.
137 """Reserialize a reply message using JSON.
79
138
80 This takes the msg list from the ZMQ socket, unserializes it using
139 This takes the msg list from the ZMQ socket, deserializes it using
81 self.session and then serializes the result using JSON. This method
140 self.session and then serializes the result using JSON. This method
82 should be used by self._on_zmq_reply to build messages that can
141 should be used by self._on_zmq_reply to build messages that can
83 be sent back to the browser.
142 be sent back to the browser.
84 """
143 """
85 idents, msg_list = self.session.feed_identities(msg_list)
144 idents, msg_list = self.session.feed_identities(msg_list)
86 msg = self.session.unserialize(msg_list)
145 msg = self.session.deserialize(msg_list)
87 try:
146 if msg['buffers']:
88 msg['header'].pop('date')
147 buf = serialize_binary_message(msg)
89 except KeyError:
148 return buf
90 pass
149 else:
91 try:
150 smsg = json.dumps(msg, default=date_default)
92 msg['parent_header'].pop('date')
151 return cast_unicode(smsg)
93 except KeyError:
94 pass
95 msg.pop('buffers')
96 return json.dumps(msg, default=date_default)
97
152
98 def _on_zmq_reply(self, msg_list):
153 def _on_zmq_reply(self, msg_list):
99 # Sometimes this gets triggered when the on_close method is scheduled in the
154 # Sometimes this gets triggered when the on_close method is scheduled in the
@@ -104,7 +159,7 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
104 except Exception:
159 except Exception:
105 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
160 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
106 else:
161 else:
107 self.write_message(msg)
162 self.write_message(msg, binary=isinstance(msg, bytes))
108
163
109 def allow_draft76(self):
164 def allow_draft76(self):
110 """Allow draft 76, until browsers such as Safari update to RFC 6455.
165 """Allow draft 76, until browsers such as Safari update to RFC 6455.
@@ -12,7 +12,7 b' from IPython.utils.py3compat import string_types'
12 from IPython.html.utils import url_path_join, url_escape
12 from IPython.html.utils import url_path_join, url_escape
13
13
14 from ...base.handlers import IPythonHandler, json_errors
14 from ...base.handlers import IPythonHandler, json_errors
15 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler
15 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
16
16
17 from IPython.core.release import kernel_protocol_version
17 from IPython.core.release import kernel_protocol_version
18
18
@@ -110,7 +110,7 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
110 """
110 """
111 idents,msg = self.session.feed_identities(msg)
111 idents,msg = self.session.feed_identities(msg)
112 try:
112 try:
113 msg = self.session.unserialize(msg)
113 msg = self.session.deserialize(msg)
114 except:
114 except:
115 self.log.error("Bad kernel_info reply", exc_info=True)
115 self.log.error("Bad kernel_info reply", exc_info=True)
116 self._request_kernel_info()
116 self._request_kernel_info()
@@ -150,6 +150,9 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
150 self.log.info("%s closed, closing websocket.", self)
150 self.log.info("%s closed, closing websocket.", self)
151 self.close()
151 self.close()
152 return
152 return
153 if isinstance(msg, bytes):
154 msg = deserialize_binary_message(msg)
155 else:
153 msg = json.loads(msg)
156 msg = json.loads(msg)
154 self.session.send(self.zmq_stream, msg)
157 self.session.send(self.zmq_stream, msg)
155
158
1 NO CONTENT: modified file
NO CONTENT: modified file
@@ -1,1 +1,1 b''
1 Subproject commit 56b35d85bb0ea150458282f4064292a5c211025a
1 Subproject commit 1968f4f78d7e8cd227d0b3f4cc3183591969b52a
@@ -129,12 +129,12 b' define(['
129 return this.kernel.send_shell_message("comm_open", content, callbacks, metadata);
129 return this.kernel.send_shell_message("comm_open", content, callbacks, metadata);
130 };
130 };
131
131
132 Comm.prototype.send = function (data, callbacks, metadata) {
132 Comm.prototype.send = function (data, callbacks, metadata, buffers) {
133 var content = {
133 var content = {
134 comm_id : this.comm_id,
134 comm_id : this.comm_id,
135 data : data || {},
135 data : data || {},
136 };
136 };
137 return this.kernel.send_shell_message("comm_msg", content, callbacks, metadata);
137 return this.kernel.send_shell_message("comm_msg", content, callbacks, metadata, buffers);
138 };
138 };
139
139
140 Comm.prototype.close = function (data, callbacks, metadata) {
140 Comm.prototype.close = function (data, callbacks, metadata) {
@@ -5,9 +5,10 b' define(['
5 'base/js/namespace',
5 'base/js/namespace',
6 'jquery',
6 'jquery',
7 'base/js/utils',
7 'base/js/utils',
8 'services/kernels/js/comm',
8 './comm',
9 'widgets/js/init',
9 './serialize',
10 ], function(IPython, $, utils, comm, widgetmanager) {
10 'widgets/js/init'
11 ], function(IPython, $, utils, comm, serialize, widgetmanager) {
11 "use strict";
12 "use strict";
12
13
13 /**
14 /**
@@ -69,7 +70,7 b' define(['
69 /**
70 /**
70 * @function _get_msg
71 * @function _get_msg
71 */
72 */
72 Kernel.prototype._get_msg = function (msg_type, content, metadata) {
73 Kernel.prototype._get_msg = function (msg_type, content, metadata, buffers) {
73 var msg = {
74 var msg = {
74 header : {
75 header : {
75 msg_id : utils.uuid(),
76 msg_id : utils.uuid(),
@@ -80,6 +81,7 b' define(['
80 },
81 },
81 metadata : metadata || {},
82 metadata : metadata || {},
82 content : content,
83 content : content,
84 buffers : buffers || [],
83 parent_header : {}
85 parent_header : {}
84 };
86 };
85 return msg;
87 return msg;
@@ -596,12 +598,12 b' define(['
596 *
598 *
597 * @function send_shell_message
599 * @function send_shell_message
598 */
600 */
599 Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata) {
601 Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) {
600 if (!this.is_connected()) {
602 if (!this.is_connected()) {
601 throw new Error("kernel is not connected");
603 throw new Error("kernel is not connected");
602 }
604 }
603 var msg = this._get_msg(msg_type, content, metadata);
605 var msg = this._get_msg(msg_type, content, metadata, buffers);
604 this.channels.shell.send(JSON.stringify(msg));
606 this.channels.shell.send(serialize.serialize(msg));
605 this.set_callbacks_for_msg(msg.header.msg_id, callbacks);
607 this.set_callbacks_for_msg(msg.header.msg_id, callbacks);
606 return msg.header.msg_id;
608 return msg.header.msg_id;
607 };
609 };
@@ -752,7 +754,7 b' define(['
752 };
754 };
753 this.events.trigger('input_reply.Kernel', {kernel: this, content: content});
755 this.events.trigger('input_reply.Kernel', {kernel: this, content: content});
754 var msg = this._get_msg("input_reply", content);
756 var msg = this._get_msg("input_reply", content);
755 this.channels.stdin.send(JSON.stringify(msg));
757 this.channels.stdin.send(serialize.serialize(msg));
756 return msg.header.msg_id;
758 return msg.header.msg_id;
757 };
759 };
758
760
@@ -850,7 +852,10 b' define(['
850 * @function _handle_shell_reply
852 * @function _handle_shell_reply
851 */
853 */
852 Kernel.prototype._handle_shell_reply = function (e) {
854 Kernel.prototype._handle_shell_reply = function (e) {
853 var reply = $.parseJSON(e.data);
855 serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this));
856 };
857
858 Kernel.prototype._finish_shell_reply = function (reply) {
854 this.events.trigger('shell_reply.Kernel', {kernel: this, reply: reply});
859 this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply});
855 var content = reply.content;
860 var content = reply.content;
856 var metadata = reply.metadata;
861 var metadata = reply.metadata;
@@ -978,8 +983,11 b' define(['
978 * @function _handle_iopub_message
983 * @function _handle_iopub_message
979 */
984 */
980 Kernel.prototype._handle_iopub_message = function (e) {
985 Kernel.prototype._handle_iopub_message = function (e) {
981 var msg = $.parseJSON(e.data);
986 serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this));
987 };
988
982
989
990 Kernel.prototype._finish_iopub_message = function (msg) {
983 var handler = this.get_iopub_handler(msg.header.msg_type);
991 var handler = this.get_iopub_handler(msg.header.msg_type);
984 if (handler !== undefined) {
992 if (handler !== undefined) {
985 handler(msg);
993 handler(msg);
@@ -990,7 +998,11 b' define(['
990 * @function _handle_input_request
998 * @function _handle_input_request
991 */
999 */
992 Kernel.prototype._handle_input_request = function (e) {
1000 Kernel.prototype._handle_input_request = function (e) {
993 var request = $.parseJSON(e.data);
1001 serialize.deserialize(e.data, $.proxy(this._finish_input_request, this));
1002 };
1003
1004
1005 Kernel.prototype._finish_input_request = function (request) {
994 var header = request.header;
1006 var header = request.header;
995 var content = request.content;
1007 var content = request.content;
996 var metadata = request.metadata;
1008 var metadata = request.metadata;
@@ -317,6 +317,7 b' class="notebook_app"'
317 {% block script %}
317 {% block script %}
318 {{super()}}
318 {{super()}}
319
319
320 <script src="{{ static_url("components/text-encoding/lib/encoding.js") }}" charset="utf-8"></script>
320
321
321 <script src="{{ static_url("notebook/js/main.js") }}" charset="utf-8"></script>
322 <script src="{{ static_url("notebook/js/main.js") }}" charset="utf-8"></script>
322
323
@@ -176,7 +176,7 b' class ZMQSocketChannel(Thread):'
176 Unpacks message, and calls handlers with it.
176 Unpacks message, and calls handlers with it.
177 """
177 """
178 ident,smsg = self.session.feed_identities(msg)
178 ident,smsg = self.session.feed_identities(msg)
179 msg = self.session.unserialize(smsg)
179 msg = self.session.deserialize(smsg)
180 self.call_handlers(msg)
180 self.call_handlers(msg)
181
181
182
182
@@ -57,7 +57,7 b' class Comm(LoggingConfigurable):'
57 # I am primary, open my peer.
57 # I am primary, open my peer.
58 self.open(data)
58 self.open(data)
59
59
60 def _publish_msg(self, msg_type, data=None, metadata=None, **keys):
60 def _publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
61 """Helper for sending a comm message on IOPub"""
61 """Helper for sending a comm message on IOPub"""
62 data = {} if data is None else data
62 data = {} if data is None else data
63 metadata = {} if metadata is None else metadata
63 metadata = {} if metadata is None else metadata
@@ -67,6 +67,7 b' class Comm(LoggingConfigurable):'
67 metadata=json_clean(metadata),
67 metadata=json_clean(metadata),
68 parent=self.kernel._parent_header,
68 parent=self.kernel._parent_header,
69 ident=self.topic,
69 ident=self.topic,
70 buffers=buffers,
70 )
71 )
71
72
72 def __del__(self):
73 def __del__(self):
@@ -75,7 +76,7 b' class Comm(LoggingConfigurable):'
75
76
76 # publishing messages
77 # publishing messages
77
78
78 def open(self, data=None, metadata=None):
79 def open(self, data=None, metadata=None, buffers=None):
79 """Open the frontend-side version of this comm"""
80 """Open the frontend-side version of this comm"""
80 if data is None:
81 if data is None:
81 data = self._open_data
82 data = self._open_data
@@ -86,22 +87,29 b' class Comm(LoggingConfigurable):'
86
87
87 comm_manager.register_comm(self)
88 comm_manager.register_comm(self)
88 self._closed = False
89 self._closed = False
89 self._publish_msg('comm_open', data, metadata, target_name=self.target_name)
90 self._publish_msg('comm_open',
91 data=data, metadata=metadata, buffers=buffers,
92 target_name=self.target_name,
93 )
90
94
91 def close(self, data=None, metadata=None):
95 def close(self, data=None, metadata=None, buffers=None):
92 """Close the frontend-side version of this comm"""
96 """Close the frontend-side version of this comm"""
93 if self._closed:
97 if self._closed:
94 # only close once
98 # only close once
95 return
99 return
96 if data is None:
100 if data is None:
97 data = self._close_data
101 data = self._close_data
98 self._publish_msg('comm_close', data, metadata)
102 self._publish_msg('comm_close',
103 data=data, metadata=metadata, buffers=buffers,
104 )
99 self.kernel.comm_manager.unregister_comm(self)
105 self.kernel.comm_manager.unregister_comm(self)
100 self._closed = True
106 self._closed = True
101
107
102 def send(self, data=None, metadata=None):
108 def send(self, data=None, metadata=None, buffers=None):
103 """Send a message to the frontend-side version of this comm"""
109 """Send a message to the frontend-side version of this comm"""
104 self._publish_msg('comm_msg', data, metadata)
110 self._publish_msg('comm_msg',
111 data=data, metadata=metadata, buffers=buffers,
112 )
105
113
106 # registering callbacks
114 # registering callbacks
107
115
@@ -130,7 +130,7 b' class Kernel(SingletonConfigurable):'
130 """dispatch control requests"""
130 """dispatch control requests"""
131 idents,msg = self.session.feed_identities(msg, copy=False)
131 idents,msg = self.session.feed_identities(msg, copy=False)
132 try:
132 try:
133 msg = self.session.unserialize(msg, content=True, copy=False)
133 msg = self.session.deserialize(msg, content=True, copy=False)
134 except:
134 except:
135 self.log.error("Invalid Control Message", exc_info=True)
135 self.log.error("Invalid Control Message", exc_info=True)
136 return
136 return
@@ -165,7 +165,7 b' class Kernel(SingletonConfigurable):'
165
165
166 idents,msg = self.session.feed_identities(msg, copy=False)
166 idents,msg = self.session.feed_identities(msg, copy=False)
167 try:
167 try:
168 msg = self.session.unserialize(msg, content=True, copy=False)
168 msg = self.session.deserialize(msg, content=True, copy=False)
169 except:
169 except:
170 self.log.error("Invalid Message", exc_info=True)
170 self.log.error("Invalid Message", exc_info=True)
171 return
171 return
@@ -89,7 +89,7 b' def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):'
89 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
89 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
90 return buffers
90 return buffers
91
91
92 def unserialize_object(buffers, g=None):
92 def deserialize_object(buffers, g=None):
93 """reconstruct an object serialized by serialize_object from data buffers.
93 """reconstruct an object serialized by serialize_object from data buffers.
94
94
95 Parameters
95 Parameters
@@ -170,14 +170,14 b' def unpack_apply_message(bufs, g=None, copy=True):'
170
170
171 args = []
171 args = []
172 for i in range(info['nargs']):
172 for i in range(info['nargs']):
173 arg, arg_bufs = unserialize_object(arg_bufs, g)
173 arg, arg_bufs = deserialize_object(arg_bufs, g)
174 args.append(arg)
174 args.append(arg)
175 args = tuple(args)
175 args = tuple(args)
176 assert not arg_bufs, "Shouldn't be any arg bufs left over"
176 assert not arg_bufs, "Shouldn't be any arg bufs left over"
177
177
178 kwargs = {}
178 kwargs = {}
179 for key in info['kw_keys']:
179 for key in info['kw_keys']:
180 kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
180 kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g)
181 kwargs[key] = kwarg
181 kwargs[key] = kwarg
182 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
182 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
183
183
@@ -18,6 +18,7 b' import os'
18 import pprint
18 import pprint
19 import random
19 import random
20 import uuid
20 import uuid
21 import warnings
21 from datetime import datetime
22 from datetime import datetime
22
23
23 try:
24 try:
@@ -492,7 +493,7 b' class Session(Configurable):'
492 """Return the nested message dict.
493 """Return the nested message dict.
493
494
494 This format is different from what is sent over the wire. The
495 This format is different from what is sent over the wire. The
495 serialize/unserialize methods converts this nested message dict to the wire
496 serialize/deserialize methods converts this nested message dict to the wire
496 format, which is a list of message parts.
497 format, which is a list of message parts.
497 """
498 """
498 msg = {}
499 msg = {}
@@ -525,7 +526,7 b' class Session(Configurable):'
525 def serialize(self, msg, ident=None):
526 def serialize(self, msg, ident=None):
526 """Serialize the message components to bytes.
527 """Serialize the message components to bytes.
527
528
528 This is roughly the inverse of unserialize. The serialize/unserialize
529 This is roughly the inverse of deserialize. The serialize/deserialize
529 methods work with full message lists, whereas pack/unpack work with
530 methods work with full message lists, whereas pack/unpack work with
530 the individual message parts in the message list.
531 the individual message parts in the message list.
531
532
@@ -590,7 +591,7 b' class Session(Configurable):'
590 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
591 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
591 buffer1,buffer2,...]
592 buffer1,buffer2,...]
592
593
593 The serialize/unserialize methods convert the nested message dict into this
594 The serialize/deserialize methods convert the nested message dict into this
594 format.
595 format.
595
596
596 Parameters
597 Parameters
@@ -634,6 +635,7 b' class Session(Configurable):'
634 # We got a Message or message dict, not a msg_type so don't
635 # We got a Message or message dict, not a msg_type so don't
635 # build a new Message.
636 # build a new Message.
636 msg = msg_or_type
637 msg = msg_or_type
638 buffers = buffers or msg.get('buffers', [])
637 else:
639 else:
638 msg = self.msg(msg_or_type, content=content, parent=parent,
640 msg = self.msg(msg_or_type, content=content, parent=parent,
639 header=header, metadata=metadata)
641 header=header, metadata=metadata)
@@ -722,7 +724,7 b' class Session(Configurable):'
722 # invalid large messages can cause very expensive string comparisons
724 # invalid large messages can cause very expensive string comparisons
723 idents, msg_list = self.feed_identities(msg_list, copy)
725 idents, msg_list = self.feed_identities(msg_list, copy)
724 try:
726 try:
725 return idents, self.unserialize(msg_list, content=content, copy=copy)
727 return idents, self.deserialize(msg_list, content=content, copy=copy)
726 except Exception as e:
728 except Exception as e:
727 # TODO: handle it
729 # TODO: handle it
728 raise e
730 raise e
@@ -747,7 +749,7 b' class Session(Configurable):'
747 idents will always be a list of bytes, each of which is a ZMQ
749 idents will always be a list of bytes, each of which is a ZMQ
748 identity. msg_list will be a list of bytes or zmq.Messages of the
750 identity. msg_list will be a list of bytes or zmq.Messages of the
749 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
751 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
750 should be unpackable/unserializable via self.unserialize at this
752 should be unpackable/unserializable via self.deserialize at this
751 point.
753 point.
752 """
754 """
753 if copy:
755 if copy:
@@ -788,10 +790,10 b' class Session(Configurable):'
788 to_cull = random.sample(self.digest_history, n_to_cull)
790 to_cull = random.sample(self.digest_history, n_to_cull)
789 self.digest_history.difference_update(to_cull)
791 self.digest_history.difference_update(to_cull)
790
792
791 def unserialize(self, msg_list, content=True, copy=True):
793 def deserialize(self, msg_list, content=True, copy=True):
792 """Unserialize a msg_list to a nested message dict.
794 """Unserialize a msg_list to a nested message dict.
793
795
794 This is roughly the inverse of serialize. The serialize/unserialize
796 This is roughly the inverse of serialize. The serialize/deserialize
795 methods work with full message lists, whereas pack/unpack work with
797 methods work with full message lists, whereas pack/unpack work with
796 the individual message parts in the message list.
798 the individual message parts in the message list.
797
799
@@ -842,10 +844,16 b' class Session(Configurable):'
842 message['content'] = msg_list[4]
844 message['content'] = msg_list[4]
843
845
844 message['buffers'] = msg_list[5:]
846 message['buffers'] = msg_list[5:]
845 # print("received: %s: %s\n %s" % (message['msg_type'], message['header'], message['content']))
846 # adapt to the current version
847 # adapt to the current version
847 return adapt(message)
848 return adapt(message)
848 # print("adapted: %s: %s\n %s" % (adapted['msg_type'], adapted['header'], adapted['content']))
849
850 def unserialize(self, *args, **kwargs):
851 warnings.warn(
852 "Session.unserialize is deprecated. Use Session.deserialize.",
853 DeprecationWarning,
854 )
855 return self.deserialize(*args, **kwargs)
856
849
857
850 def test_msg2obj():
858 def test_msg2obj():
851 am = dict(x=1)
859 am = dict(x=1)
@@ -9,7 +9,7 b' from collections import namedtuple'
9 import nose.tools as nt
9 import nose.tools as nt
10
10
11 # from unittest import TestCaes
11 # from unittest import TestCaes
12 from IPython.kernel.zmq.serialize import serialize_object, unserialize_object
12 from IPython.kernel.zmq.serialize import serialize_object, deserialize_object
13 from IPython.testing import decorators as dec
13 from IPython.testing import decorators as dec
14 from IPython.utils.pickleutil import CannedArray, CannedClass
14 from IPython.utils.pickleutil import CannedArray, CannedClass
15 from IPython.utils.py3compat import iteritems
15 from IPython.utils.py3compat import iteritems
@@ -22,7 +22,7 b' from IPython.parallel import interactive'
22 def roundtrip(obj):
22 def roundtrip(obj):
23 """roundtrip an object through serialization"""
23 """roundtrip an object through serialization"""
24 bufs = serialize_object(obj)
24 bufs = serialize_object(obj)
25 obj2, remainder = unserialize_object(bufs)
25 obj2, remainder = deserialize_object(bufs)
26 nt.assert_equals(remainder, [])
26 nt.assert_equals(remainder, [])
27 return obj2
27 return obj2
28
28
@@ -70,7 +70,7 b' def test_roundtrip_buffered():'
70 ]:
70 ]:
71 bufs = serialize_object(obj)
71 bufs = serialize_object(obj)
72 nt.assert_equal(len(bufs), 2)
72 nt.assert_equal(len(bufs), 2)
73 obj2, remainder = unserialize_object(bufs)
73 obj2, remainder = deserialize_object(bufs)
74 nt.assert_equal(remainder, [])
74 nt.assert_equal(remainder, [])
75 nt.assert_equal(obj, obj2)
75 nt.assert_equal(obj, obj2)
76
76
@@ -82,7 +82,7 b' def test_numpy():'
82 for dtype in DTYPES:
82 for dtype in DTYPES:
83 A = new_array(shape, dtype=dtype)
83 A = new_array(shape, dtype=dtype)
84 bufs = serialize_object(A)
84 bufs = serialize_object(A)
85 B, r = unserialize_object(bufs)
85 B, r = deserialize_object(bufs)
86 nt.assert_equal(r, [])
86 nt.assert_equal(r, [])
87 nt.assert_equal(A.shape, B.shape)
87 nt.assert_equal(A.shape, B.shape)
88 nt.assert_equal(A.dtype, B.dtype)
88 nt.assert_equal(A.dtype, B.dtype)
@@ -100,7 +100,7 b' def test_recarray():'
100 A = new_array(shape, dtype=dtype)
100 A = new_array(shape, dtype=dtype)
101
101
102 bufs = serialize_object(A)
102 bufs = serialize_object(A)
103 B, r = unserialize_object(bufs)
103 B, r = deserialize_object(bufs)
104 nt.assert_equal(r, [])
104 nt.assert_equal(r, [])
105 nt.assert_equal(A.shape, B.shape)
105 nt.assert_equal(A.shape, B.shape)
106 nt.assert_equal(A.dtype, B.dtype)
106 nt.assert_equal(A.dtype, B.dtype)
@@ -116,7 +116,7 b' def test_numpy_in_seq():'
116 bufs = serialize_object((A,1,2,b'hello'))
116 bufs = serialize_object((A,1,2,b'hello'))
117 canned = pickle.loads(bufs[0])
117 canned = pickle.loads(bufs[0])
118 nt.assert_is_instance(canned[0], CannedArray)
118 nt.assert_is_instance(canned[0], CannedArray)
119 tup, r = unserialize_object(bufs)
119 tup, r = deserialize_object(bufs)
120 B = tup[0]
120 B = tup[0]
121 nt.assert_equal(r, [])
121 nt.assert_equal(r, [])
122 nt.assert_equal(A.shape, B.shape)
122 nt.assert_equal(A.shape, B.shape)
@@ -133,7 +133,7 b' def test_numpy_in_dict():'
133 bufs = serialize_object(dict(a=A,b=1,c=range(20)))
133 bufs = serialize_object(dict(a=A,b=1,c=range(20)))
134 canned = pickle.loads(bufs[0])
134 canned = pickle.loads(bufs[0])
135 nt.assert_is_instance(canned['a'], CannedArray)
135 nt.assert_is_instance(canned['a'], CannedArray)
136 d, r = unserialize_object(bufs)
136 d, r = deserialize_object(bufs)
137 B = d['a']
137 B = d['a']
138 nt.assert_equal(r, [])
138 nt.assert_equal(r, [])
139 nt.assert_equal(A.shape, B.shape)
139 nt.assert_equal(A.shape, B.shape)
@@ -147,7 +147,7 b' def test_class():'
147 bufs = serialize_object(dict(C=C))
147 bufs = serialize_object(dict(C=C))
148 canned = pickle.loads(bufs[0])
148 canned = pickle.loads(bufs[0])
149 nt.assert_is_instance(canned['C'], CannedClass)
149 nt.assert_is_instance(canned['C'], CannedClass)
150 d, r = unserialize_object(bufs)
150 d, r = deserialize_object(bufs)
151 C2 = d['C']
151 C2 = d['C']
152 nt.assert_equal(C2.a, C.a)
152 nt.assert_equal(C2.a, C.a)
153
153
@@ -159,7 +159,7 b' def test_class_oldstyle():'
159 bufs = serialize_object(dict(C=C))
159 bufs = serialize_object(dict(C=C))
160 canned = pickle.loads(bufs[0])
160 canned = pickle.loads(bufs[0])
161 nt.assert_is_instance(canned['C'], CannedClass)
161 nt.assert_is_instance(canned['C'], CannedClass)
162 d, r = unserialize_object(bufs)
162 d, r = deserialize_object(bufs)
163 C2 = d['C']
163 C2 = d['C']
164 nt.assert_equal(C2.a, C.a)
164 nt.assert_equal(C2.a, C.a)
165
165
@@ -168,7 +168,7 b' def test_tuple():'
168 bufs = serialize_object(tup)
168 bufs = serialize_object(tup)
169 canned = pickle.loads(bufs[0])
169 canned = pickle.loads(bufs[0])
170 nt.assert_is_instance(canned, tuple)
170 nt.assert_is_instance(canned, tuple)
171 t2, r = unserialize_object(bufs)
171 t2, r = deserialize_object(bufs)
172 nt.assert_equal(t2[0](t2[1]), tup[0](tup[1]))
172 nt.assert_equal(t2[0](t2[1]), tup[0](tup[1]))
173
173
174 point = namedtuple('point', 'x y')
174 point = namedtuple('point', 'x y')
@@ -178,7 +178,7 b' def test_namedtuple():'
178 bufs = serialize_object(p)
178 bufs = serialize_object(p)
179 canned = pickle.loads(bufs[0])
179 canned = pickle.loads(bufs[0])
180 nt.assert_is_instance(canned, point)
180 nt.assert_is_instance(canned, point)
181 p2, r = unserialize_object(bufs, globals())
181 p2, r = deserialize_object(bufs, globals())
182 nt.assert_equal(p2.x, p.x)
182 nt.assert_equal(p2.x, p.x)
183 nt.assert_equal(p2.y, p.y)
183 nt.assert_equal(p2.y, p.y)
184
184
@@ -187,7 +187,7 b' def test_list():'
187 bufs = serialize_object(lis)
187 bufs = serialize_object(lis)
188 canned = pickle.loads(bufs[0])
188 canned = pickle.loads(bufs[0])
189 nt.assert_is_instance(canned, list)
189 nt.assert_is_instance(canned, list)
190 l2, r = unserialize_object(bufs)
190 l2, r = deserialize_object(bufs)
191 nt.assert_equal(l2[0](l2[1]), lis[0](lis[1]))
191 nt.assert_equal(l2[0](l2[1]), lis[0](lis[1]))
192
192
193 def test_class_inheritance():
193 def test_class_inheritance():
@@ -202,7 +202,7 b' def test_class_inheritance():'
202 bufs = serialize_object(dict(D=D))
202 bufs = serialize_object(dict(D=D))
203 canned = pickle.loads(bufs[0])
203 canned = pickle.loads(bufs[0])
204 nt.assert_is_instance(canned['D'], CannedClass)
204 nt.assert_is_instance(canned['D'], CannedClass)
205 d, r = unserialize_object(bufs)
205 d, r = deserialize_object(bufs)
206 D2 = d['D']
206 D2 = d['D']
207 nt.assert_equal(D2.a, D.a)
207 nt.assert_equal(D2.a, D.a)
208 nt.assert_equal(D2.b, D.b)
208 nt.assert_equal(D2.b, D.b)
@@ -60,7 +60,7 b' class TestSession(SessionTestCase):'
60 msg = self.session.msg('execute', content=dict(a=10, b=1.1))
60 msg = self.session.msg('execute', content=dict(a=10, b=1.1))
61 msg_list = self.session.serialize(msg, ident=b'foo')
61 msg_list = self.session.serialize(msg, ident=b'foo')
62 ident, msg_list = self.session.feed_identities(msg_list)
62 ident, msg_list = self.session.feed_identities(msg_list)
63 new_msg = self.session.unserialize(msg_list)
63 new_msg = self.session.deserialize(msg_list)
64 self.assertEqual(ident[0], b'foo')
64 self.assertEqual(ident[0], b'foo')
65 self.assertEqual(new_msg['msg_id'],msg['msg_id'])
65 self.assertEqual(new_msg['msg_id'],msg['msg_id'])
66 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
66 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
@@ -82,7 +82,7 b' class TestSession(SessionTestCase):'
82 self.session.send(A, msg, ident=b'foo', buffers=[b'bar'])
82 self.session.send(A, msg, ident=b'foo', buffers=[b'bar'])
83
83
84 ident, msg_list = self.session.feed_identities(B.recv_multipart())
84 ident, msg_list = self.session.feed_identities(B.recv_multipart())
85 new_msg = self.session.unserialize(msg_list)
85 new_msg = self.session.deserialize(msg_list)
86 self.assertEqual(ident[0], b'foo')
86 self.assertEqual(ident[0], b'foo')
87 self.assertEqual(new_msg['msg_id'],msg['msg_id'])
87 self.assertEqual(new_msg['msg_id'],msg['msg_id'])
88 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
88 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
@@ -100,7 +100,7 b' class TestSession(SessionTestCase):'
100 self.session.send(A, None, content=content, parent=parent,
100 self.session.send(A, None, content=content, parent=parent,
101 header=header, metadata=metadata, ident=b'foo', buffers=[b'bar'])
101 header=header, metadata=metadata, ident=b'foo', buffers=[b'bar'])
102 ident, msg_list = self.session.feed_identities(B.recv_multipart())
102 ident, msg_list = self.session.feed_identities(B.recv_multipart())
103 new_msg = self.session.unserialize(msg_list)
103 new_msg = self.session.deserialize(msg_list)
104 self.assertEqual(ident[0], b'foo')
104 self.assertEqual(ident[0], b'foo')
105 self.assertEqual(new_msg['msg_id'],msg['msg_id'])
105 self.assertEqual(new_msg['msg_id'],msg['msg_id'])
106 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
106 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
@@ -263,7 +263,7 b' class TestSession(SessionTestCase):'
263 p = session.msg('msg')
263 p = session.msg('msg')
264 msg = session.msg('msg', content=content, metadata=metadata, parent=p['header'])
264 msg = session.msg('msg', content=content, metadata=metadata, parent=p['header'])
265 smsg = session.serialize(msg)
265 smsg = session.serialize(msg)
266 msg2 = session.unserialize(session.feed_identities(smsg)[1])
266 msg2 = session.deserialize(session.feed_identities(smsg)[1])
267 assert isinstance(msg2['header']['date'], datetime)
267 assert isinstance(msg2['header']['date'], datetime)
268 self.assertEqual(msg['header'], msg2['header'])
268 self.assertEqual(msg['header'], msg2['header'])
269 self.assertEqual(msg['parent_header'], msg2['parent_header'])
269 self.assertEqual(msg['parent_header'], msg2['parent_header'])
@@ -305,7 +305,7 b' class TestSession(SessionTestCase):'
305 self.session.send_raw(A, msg_list, ident=b'foo')
305 self.session.send_raw(A, msg_list, ident=b'foo')
306
306
307 ident, new_msg_list = self.session.feed_identities(B.recv_multipart())
307 ident, new_msg_list = self.session.feed_identities(B.recv_multipart())
308 new_msg = self.session.unserialize(new_msg_list)
308 new_msg = self.session.deserialize(new_msg_list)
309 self.assertEqual(ident[0], b'foo')
309 self.assertEqual(ident[0], b'foo')
310 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
310 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
311 self.assertEqual(new_msg['header'],msg['header'])
311 self.assertEqual(new_msg['header'],msg['header'])
@@ -801,7 +801,7 b' class Client(HasTraits):'
801
801
802 # construct result:
802 # construct result:
803 if content['status'] == 'ok':
803 if content['status'] == 'ok':
804 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
804 self.results[msg_id] = serialize.deserialize_object(msg['buffers'])[0]
805 elif content['status'] == 'aborted':
805 elif content['status'] == 'aborted':
806 self.results[msg_id] = error.TaskAborted(msg_id)
806 self.results[msg_id] = error.TaskAborted(msg_id)
807 elif content['status'] == 'resubmitted':
807 elif content['status'] == 'resubmitted':
@@ -903,7 +903,7 b' class Client(HasTraits):'
903 elif msg_type == 'execute_result':
903 elif msg_type == 'execute_result':
904 md['execute_result'] = content
904 md['execute_result'] = content
905 elif msg_type == 'data_message':
905 elif msg_type == 'data_message':
906 data, remainder = serialize.unserialize_object(msg['buffers'])
906 data, remainder = serialize.deserialize_object(msg['buffers'])
907 md['data'].update(data)
907 md['data'].update(data)
908 elif msg_type == 'status':
908 elif msg_type == 'status':
909 # idle message comes after all outputs
909 # idle message comes after all outputs
@@ -1612,7 +1612,7 b' class Client(HasTraits):'
1612
1612
1613 if rcontent['status'] == 'ok':
1613 if rcontent['status'] == 'ok':
1614 if header['msg_type'] == 'apply_reply':
1614 if header['msg_type'] == 'apply_reply':
1615 res,buffers = serialize.unserialize_object(buffers)
1615 res,buffers = serialize.deserialize_object(buffers)
1616 elif header['msg_type'] == 'execute_reply':
1616 elif header['msg_type'] == 'execute_reply':
1617 res = ExecuteReply(msg_id, rcontent, md)
1617 res = ExecuteReply(msg_id, rcontent, md)
1618 else:
1618 else:
@@ -523,7 +523,7 b' class Hub(SessionFactory):'
523 return
523 return
524 client_id = idents[0]
524 client_id = idents[0]
525 try:
525 try:
526 msg = self.session.unserialize(msg, content=True)
526 msg = self.session.deserialize(msg, content=True)
527 except Exception:
527 except Exception:
528 content = error.wrap_exception()
528 content = error.wrap_exception()
529 self.log.error("Bad Query Message: %r", msg, exc_info=True)
529 self.log.error("Bad Query Message: %r", msg, exc_info=True)
@@ -588,7 +588,7 b' class Hub(SessionFactory):'
588 return
588 return
589 queue_id, client_id = idents[:2]
589 queue_id, client_id = idents[:2]
590 try:
590 try:
591 msg = self.session.unserialize(msg)
591 msg = self.session.deserialize(msg)
592 except Exception:
592 except Exception:
593 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
593 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
594 return
594 return
@@ -636,7 +636,7 b' class Hub(SessionFactory):'
636
636
637 client_id, queue_id = idents[:2]
637 client_id, queue_id = idents[:2]
638 try:
638 try:
639 msg = self.session.unserialize(msg)
639 msg = self.session.deserialize(msg)
640 except Exception:
640 except Exception:
641 self.log.error("queue::engine %r sent invalid message to %r: %r",
641 self.log.error("queue::engine %r sent invalid message to %r: %r",
642 queue_id, client_id, msg, exc_info=True)
642 queue_id, client_id, msg, exc_info=True)
@@ -690,7 +690,7 b' class Hub(SessionFactory):'
690 client_id = idents[0]
690 client_id = idents[0]
691
691
692 try:
692 try:
693 msg = self.session.unserialize(msg)
693 msg = self.session.deserialize(msg)
694 except Exception:
694 except Exception:
695 self.log.error("task::client %r sent invalid task message: %r",
695 self.log.error("task::client %r sent invalid task message: %r",
696 client_id, msg, exc_info=True)
696 client_id, msg, exc_info=True)
@@ -740,7 +740,7 b' class Hub(SessionFactory):'
740 """save the result of a completed task."""
740 """save the result of a completed task."""
741 client_id = idents[0]
741 client_id = idents[0]
742 try:
742 try:
743 msg = self.session.unserialize(msg)
743 msg = self.session.deserialize(msg)
744 except Exception:
744 except Exception:
745 self.log.error("task::invalid task result message send to %r: %r",
745 self.log.error("task::invalid task result message send to %r: %r",
746 client_id, msg, exc_info=True)
746 client_id, msg, exc_info=True)
@@ -794,7 +794,7 b' class Hub(SessionFactory):'
794
794
795 def save_task_destination(self, idents, msg):
795 def save_task_destination(self, idents, msg):
796 try:
796 try:
797 msg = self.session.unserialize(msg, content=True)
797 msg = self.session.deserialize(msg, content=True)
798 except Exception:
798 except Exception:
799 self.log.error("task::invalid task tracking message", exc_info=True)
799 self.log.error("task::invalid task tracking message", exc_info=True)
800 return
800 return
@@ -831,7 +831,7 b' class Hub(SessionFactory):'
831 """save an iopub message into the db"""
831 """save an iopub message into the db"""
832 # print (topics)
832 # print (topics)
833 try:
833 try:
834 msg = self.session.unserialize(msg, content=True)
834 msg = self.session.deserialize(msg, content=True)
835 except Exception:
835 except Exception:
836 self.log.error("iopub::invalid IOPub message", exc_info=True)
836 self.log.error("iopub::invalid IOPub message", exc_info=True)
837 return
837 return
@@ -251,7 +251,7 b' class TaskScheduler(SessionFactory):'
251 self.log.warn("task::Invalid Message: %r",msg)
251 self.log.warn("task::Invalid Message: %r",msg)
252 return
252 return
253 try:
253 try:
254 msg = self.session.unserialize(msg)
254 msg = self.session.deserialize(msg)
255 except ValueError:
255 except ValueError:
256 self.log.warn("task::Unauthorized message from: %r"%idents)
256 self.log.warn("task::Unauthorized message from: %r"%idents)
257 return
257 return
@@ -270,7 +270,7 b' class TaskScheduler(SessionFactory):'
270 self.log.warn("task::Invalid Message: %r",msg)
270 self.log.warn("task::Invalid Message: %r",msg)
271 return
271 return
272 try:
272 try:
273 msg = self.session.unserialize(msg)
273 msg = self.session.deserialize(msg)
274 except ValueError:
274 except ValueError:
275 self.log.warn("task::Unauthorized message from: %r"%idents)
275 self.log.warn("task::Unauthorized message from: %r"%idents)
276 return
276 return
@@ -375,7 +375,7 b' class TaskScheduler(SessionFactory):'
375 self.notifier_stream.flush()
375 self.notifier_stream.flush()
376 try:
376 try:
377 idents, msg = self.session.feed_identities(raw_msg, copy=False)
377 idents, msg = self.session.feed_identities(raw_msg, copy=False)
378 msg = self.session.unserialize(msg, content=False, copy=False)
378 msg = self.session.deserialize(msg, content=False, copy=False)
379 except Exception:
379 except Exception:
380 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
380 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
381 return
381 return
@@ -621,7 +621,7 b' class TaskScheduler(SessionFactory):'
621 """dispatch method for result replies"""
621 """dispatch method for result replies"""
622 try:
622 try:
623 idents,msg = self.session.feed_identities(raw_msg, copy=False)
623 idents,msg = self.session.feed_identities(raw_msg, copy=False)
624 msg = self.session.unserialize(msg, content=False, copy=False)
624 msg = self.session.deserialize(msg, content=False, copy=False)
625 engine = idents[0]
625 engine = idents[0]
626 try:
626 try:
627 idx = self.targets.index(engine)
627 idx = self.targets.index(engine)
@@ -159,7 +159,7 b' class EngineFactory(RegistrationFactory):'
159 loop = self.loop
159 loop = self.loop
160 identity = self.bident
160 identity = self.bident
161 idents,msg = self.session.feed_identities(msg)
161 idents,msg = self.session.feed_identities(msg)
162 msg = self.session.unserialize(msg)
162 msg = self.session.deserialize(msg)
163 content = msg['content']
163 content = msg['content']
164 info = self.connection_info
164 info = self.connection_info
165
165
@@ -162,6 +162,7 b' def find_package_data():'
162 pjoin(components, "underscore", "underscore-min.js"),
162 pjoin(components, "underscore", "underscore-min.js"),
163 pjoin(components, "moment", "moment.js"),
163 pjoin(components, "moment", "moment.js"),
164 pjoin(components, "moment", "min","moment.min.js"),
164 pjoin(components, "moment", "min", "moment.min.js"),
165 pjoin(components, "text-encoding", "lib", "encoding.js"),
165 ])
166 ])
166
167
167 # Ship all of Codemirror's CSS and JS
168 # Ship all of Codemirror's CSS and JS
General Comments 0
You need to be logged in to leave comments. Login now