##// END OF EJS Templates
Merge pull request #6110 from minrk/binarycomm...
Matthias Bussonnier -
r18418:2a8a2c87 merge
parent child Browse files
Show More
@@ -0,0 +1,114 b''
1 // Copyright (c) IPython Development Team.
2 // Distributed under the terms of the Modified BSD License.
3
4 define([
5 'underscore',
6 ], function (_) {
7 "use strict";
8
9 var _deserialize_array_buffer = function (buf) {
10 var data = new DataView(buf);
11 // read the header: 1 + nbufs 32b integers
12 var nbufs = data.getUint32(0);
13 var offsets = [];
14 var i;
15 for (i = 1; i <= nbufs; i++) {
16 offsets.push(data.getUint32(i * 4));
17 }
18 var json_bytes = new Uint8Array(buf.slice(offsets[0], offsets[1]));
19 var msg = JSON.parse(
20 (new TextDecoder('utf8')).decode(json_bytes)
21 );
22 // the remaining chunks are stored as DataViews in msg.buffers
23 msg.buffers = [];
24 var start, stop;
25 for (i = 1; i < nbufs; i++) {
26 start = offsets[i];
27 stop = offsets[i+1] || buf.byteLength;
28 msg.buffers.push(new DataView(buf.slice(start, stop)));
29 }
30 return msg;
31 };
32
33 var _deserialize_binary = function(data, callback) {
34 // deserialize the binary message format
35 // callback will be called with a message whose buffers attribute
36 // will be an array of DataViews.
37 if (data instanceof Blob) {
38 // data is Blob, have to deserialize from ArrayBuffer in reader callback
39 var reader = new FileReader();
40 reader.onload = function () {
41 var msg = _deserialize_array_buffer(this.result);
42 callback(msg);
43 };
44 reader.readAsArrayBuffer(data);
45 } else {
46 // data is ArrayBuffer, can deserialize directly
47 var msg = _deserialize_array_buffer(data);
48 callback(msg);
49 }
50 };
51
52 var deserialize = function (data, callback) {
53 // deserialize a message and pass the unpacked message object to callback
54 if (typeof data === "string") {
55 // text JSON message
56 callback(JSON.parse(data));
57 } else {
58 // binary message
59 _deserialize_binary(data, callback);
60 }
61 };
62
63 var _serialize_binary = function (msg) {
64 // implement the binary serialization protocol
65 // serializes JSON message to ArrayBuffer
66 msg = _.clone(msg);
67 var offsets = [];
68 var buffers = [];
69 msg.buffers.map(function (buf) {
70 buffers.push(buf);
71 });
72 delete msg.buffers;
73 var json_utf8 = (new TextEncoder('utf8')).encode(JSON.stringify(msg));
74 buffers.unshift(json_utf8);
75 var nbufs = buffers.length;
76 offsets.push(4 * (nbufs + 1));
77 var i;
78 for (i = 0; i + 1 < buffers.length; i++) {
79 offsets.push(offsets[offsets.length-1] + buffers[i].byteLength);
80 }
81 var msg_buf = new Uint8Array(
82 offsets[offsets.length-1] + buffers[buffers.length-1].byteLength
83 );
84 // use DataView.setUint32 for network byte-order
85 var view = new DataView(msg_buf.buffer);
86 // write nbufs to first 4 bytes
87 view.setUint32(0, nbufs);
88 // write offsets to next 4 * nbufs bytes
89 for (i = 0; i < offsets.length; i++) {
90 view.setUint32(4 * (i+1), offsets[i]);
91 }
92 // write all the buffers at their respective offsets
93 for (i = 0; i < buffers.length; i++) {
94 msg_buf.set(new Uint8Array(buffers[i].buffer), offsets[i]);
95 }
96
97 // return raw ArrayBuffer
98 return msg_buf.buffer;
99 };
100
101 var serialize = function (msg) {
102 if (msg.buffers && msg.buffers.length) {
103 return _serialize_binary(msg);
104 } else {
105 return JSON.stringify(msg);
106 }
107 };
108
109 var exports = {
110 deserialize : deserialize,
111 serialize: serialize
112 };
113 return exports;
114 }); No newline at end of file
@@ -0,0 +1,113 b''
1 //
2 // Test binary messages on websockets.
3 // Only works on slimer for now, due to old websocket impl in phantomjs.
4 //
5
6 casper.notebook_test(function () {
7 if (!this.slimerjs) {
8 console.log("Can't test binary websockets on phantomjs.");
9 return;
10 }
11 // create EchoBuffers target on js-side.
12 // it just captures and echos comm messages.
13 this.then(function () {
14 var success = this.evaluate(function () {
15 IPython._msgs = [];
16
17 var EchoBuffers = function(comm) {
18 this.comm = comm;
19 this.comm.on_msg($.proxy(this.on_msg, this));
20 };
21
22 EchoBuffers.prototype.on_msg = function (msg) {
23 IPython._msgs.push(msg);
24 this.comm.send(msg.content.data, {}, {}, msg.buffers);
25 };
26
27 IPython.notebook.kernel.comm_manager.register_target("echo", function (comm) {
28 return new EchoBuffers(comm);
29 });
30
31 return true;
32 });
33 this.test.assertEquals(success, true, "Created echo comm target");
34 });
35
36 // Create a similar comm that captures messages Python-side
37 this.then(function () {
38 var index = this.append_cell([
39 "import os",
40 "from IPython.kernel.comm import Comm",
41 "comm = Comm(target_name='echo')",
42 "msgs = []",
43 "def on_msg(msg):",
44 " msgs.append(msg)",
45 "comm.on_msg(on_msg)"
46 ].join('\n'), 'code');
47 this.execute_cell(index);
48 });
49
50 // send a message with binary data
51 this.then(function () {
52 var index = this.append_cell([
53 "buffers = [b'\\xFF\\x00', b'\\x00\\x01\\x02']",
54 "comm.send(data='hi', buffers=buffers)"
55 ].join('\n'), 'code');
56 this.execute_cell(index);
57 });
58
59 // wait for capture
60 this.waitFor(function () {
61 return this.evaluate(function () {
62 return IPython._msgs.length > 0;
63 });
64 });
65
66 // validate captured buffers js-side
67 this.then(function () {
68 var msgs = this.evaluate(function () {
69 return IPython._msgs;
70 });
71 this.test.assertEquals(msgs.length, 1, "Captured comm message");
72 var buffers = msgs[0].buffers;
73 this.test.assertEquals(buffers.length, 2, "comm message has buffers");
74
75 // extract attributes to test in evaluate,
76 // because the raw DataViews can't be passed across
77 var buf_info = function (index) {
78 var buf = IPython._msgs[0].buffers[index];
79 var data = {};
80 data.byteLength = buf.byteLength;
81 data.bytes = [];
82 for (var i = 0; i < data.byteLength; i++) {
83 data.bytes.push(buf.getUint8(i));
84 }
85 return data;
86 };
87
88 buf0 = this.evaluate(buf_info, 0);
89 buf1 = this.evaluate(buf_info, 1);
90 this.test.assertEquals(buf0.byteLength, 2, 'buf[0] has correct size');
91 this.test.assertEquals(buf0.bytes, [255, 0], 'buf[0] has correct bytes');
92 this.test.assertEquals(buf1.byteLength, 3, 'buf[1] has correct size');
93 this.test.assertEquals(buf1.bytes, [0, 1, 2], 'buf[1] has correct bytes');
94 });
95
96 // validate captured buffers Python-side
97 this.then(function () {
98 var index = this.append_cell([
99 "assert len(msgs) == 1, len(msgs)",
100 "bufs = msgs[0]['buffers']",
101 "assert len(bufs) == len(buffers), bufs",
102 "assert bufs[0].bytes == buffers[0], bufs[0].bytes",
103 "assert bufs[1].bytes == buffers[1], bufs[1].bytes",
104 "1",
105 ].join('\n'), 'code');
106 this.execute_cell(index);
107 this.wait_for_output(index);
108 this.then(function () {
109 var out = this.get_output_cell(index);
110 this.test.assertEquals(out['text/plain'], '1', "Python received buffers");
111 });
112 });
113 });
@@ -0,0 +1,26 b''
1 """Test serialize/deserialize messages with buffers"""
2
3 import os
4
5 import nose.tools as nt
6
7 from IPython.kernel.zmq.session import Session
8 from ..base.zmqhandlers import (
9 serialize_binary_message,
10 deserialize_binary_message,
11 )
12
13 def test_serialize_binary():
14 s = Session()
15 msg = s.msg('data_pub', content={'a': 'b'})
16 msg['buffers'] = [ os.urandom(3) for i in range(3) ]
17 bmsg = serialize_binary_message(msg)
18 nt.assert_is_instance(bmsg, bytes)
19
20 def test_deserialize_binary():
21 s = Session()
22 msg = s.msg('data_pub', content={'a': 'b'})
23 msg['buffers'] = [ os.urandom(2) for i in range(3) ]
24 bmsg = serialize_binary_message(msg)
25 msg2 = deserialize_binary_message(bmsg)
26 nt.assert_equal(msg2, msg)
@@ -4,6 +4,7 b''
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import json
7 import struct
7 8
8 9 try:
9 10 from urllib.parse import urlparse # Py 3
@@ -22,12 +23,70 b' from tornado import web'
22 23 from tornado import websocket
23 24
24 25 from IPython.kernel.zmq.session import Session
25 from IPython.utils.jsonutil import date_default
26 from IPython.utils.jsonutil import date_default, extract_dates
26 27 from IPython.utils.py3compat import PY3, cast_unicode
27 28
28 29 from .handlers import IPythonHandler
29 30
30 31
32 def serialize_binary_message(msg):
33 """serialize a message as a binary blob
34
35 Header:
36
37 4 bytes: number of msg parts (nbufs) as 32b int
38 4 * nbufs bytes: offset for each buffer as integer as 32b int
39
40 Offsets are from the start of the buffer, including the header.
41
42 Returns
43 -------
44
45 The message serialized to bytes.
46
47 """
48 # don't modify msg or buffer list in-place
49 msg = msg.copy()
50 buffers = list(msg.pop('buffers'))
51 bmsg = json.dumps(msg, default=date_default).encode('utf8')
52 buffers.insert(0, bmsg)
53 nbufs = len(buffers)
54 offsets = [4 * (nbufs + 1)]
55 for buf in buffers[:-1]:
56 offsets.append(offsets[-1] + len(buf))
57 offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets)
58 buffers.insert(0, offsets_buf)
59 return b''.join(buffers)
60
61
62 def deserialize_binary_message(bmsg):
63 """deserialize a message from a binary blog
64
65 Header:
66
67 4 bytes: number of msg parts (nbufs) as 32b int
68 4 * nbufs bytes: offset for each buffer as integer as 32b int
69
70 Offsets are from the start of the buffer, including the header.
71
72 Returns
73 -------
74
75 message dictionary
76 """
77 nbufs = struct.unpack('!i', bmsg[:4])[0]
78 offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)]))
79 offsets.append(None)
80 bufs = []
81 for start, stop in zip(offsets[:-1], offsets[1:]):
82 bufs.append(bmsg[start:stop])
83 msg = json.loads(bufs[0].decode('utf8'))
84 msg['header'] = extract_dates(msg['header'])
85 msg['parent_header'] = extract_dates(msg['parent_header'])
86 msg['buffers'] = bufs[1:]
87 return msg
88
89
31 90 class ZMQStreamHandler(websocket.WebSocketHandler):
32 91
33 92 def check_origin(self, origin):
@@ -77,23 +136,19 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
77 136 def _reserialize_reply(self, msg_list):
78 137 """Reserialize a reply message using JSON.
79 138
80 This takes the msg list from the ZMQ socket, unserializes it using
139 This takes the msg list from the ZMQ socket, deserializes it using
81 140 self.session and then serializes the result using JSON. This method
82 141 should be used by self._on_zmq_reply to build messages that can
83 142 be sent back to the browser.
84 143 """
85 144 idents, msg_list = self.session.feed_identities(msg_list)
86 msg = self.session.unserialize(msg_list)
87 try:
88 msg['header'].pop('date')
89 except KeyError:
90 pass
91 try:
92 msg['parent_header'].pop('date')
93 except KeyError:
94 pass
95 msg.pop('buffers')
96 return json.dumps(msg, default=date_default)
145 msg = self.session.deserialize(msg_list)
146 if msg['buffers']:
147 buf = serialize_binary_message(msg)
148 return buf
149 else:
150 smsg = json.dumps(msg, default=date_default)
151 return cast_unicode(smsg)
97 152
98 153 def _on_zmq_reply(self, msg_list):
99 154 # Sometimes this gets triggered when the on_close method is scheduled in the
@@ -104,7 +159,7 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
104 159 except Exception:
105 160 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
106 161 else:
107 self.write_message(msg)
162 self.write_message(msg, binary=isinstance(msg, bytes))
108 163
109 164 def allow_draft76(self):
110 165 """Allow draft 76, until browsers such as Safari update to RFC 6455.
@@ -12,7 +12,7 b' from IPython.utils.py3compat import string_types'
12 12 from IPython.html.utils import url_path_join, url_escape
13 13
14 14 from ...base.handlers import IPythonHandler, json_errors
15 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler
15 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
16 16
17 17 from IPython.core.release import kernel_protocol_version
18 18
@@ -110,7 +110,7 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
110 110 """
111 111 idents,msg = self.session.feed_identities(msg)
112 112 try:
113 msg = self.session.unserialize(msg)
113 msg = self.session.deserialize(msg)
114 114 except:
115 115 self.log.error("Bad kernel_info reply", exc_info=True)
116 116 self._request_kernel_info()
@@ -150,6 +150,9 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
150 150 self.log.info("%s closed, closing websocket.", self)
151 151 self.close()
152 152 return
153 if isinstance(msg, bytes):
154 msg = deserialize_binary_message(msg)
155 else:
153 156 msg = json.loads(msg)
154 157 self.session.send(self.zmq_stream, msg)
155 158
1 NO CONTENT: modified file
@@ -1,1 +1,1 b''
1 Subproject commit 56b35d85bb0ea150458282f4064292a5c211025a
1 Subproject commit 1968f4f78d7e8cd227d0b3f4cc3183591969b52a
@@ -129,12 +129,12 b' define(['
129 129 return this.kernel.send_shell_message("comm_open", content, callbacks, metadata);
130 130 };
131 131
132 Comm.prototype.send = function (data, callbacks, metadata) {
132 Comm.prototype.send = function (data, callbacks, metadata, buffers) {
133 133 var content = {
134 134 comm_id : this.comm_id,
135 135 data : data || {},
136 136 };
137 return this.kernel.send_shell_message("comm_msg", content, callbacks, metadata);
137 return this.kernel.send_shell_message("comm_msg", content, callbacks, metadata, buffers);
138 138 };
139 139
140 140 Comm.prototype.close = function (data, callbacks, metadata) {
@@ -5,9 +5,10 b' define(['
5 5 'base/js/namespace',
6 6 'jquery',
7 7 'base/js/utils',
8 'services/kernels/js/comm',
9 'widgets/js/init',
10 ], function(IPython, $, utils, comm, widgetmanager) {
8 './comm',
9 './serialize',
10 'widgets/js/init'
11 ], function(IPython, $, utils, comm, serialize, widgetmanager) {
11 12 "use strict";
12 13
13 14 /**
@@ -69,7 +70,7 b' define(['
69 70 /**
70 71 * @function _get_msg
71 72 */
72 Kernel.prototype._get_msg = function (msg_type, content, metadata) {
73 Kernel.prototype._get_msg = function (msg_type, content, metadata, buffers) {
73 74 var msg = {
74 75 header : {
75 76 msg_id : utils.uuid(),
@@ -80,6 +81,7 b' define(['
80 81 },
81 82 metadata : metadata || {},
82 83 content : content,
84 buffers : buffers || [],
83 85 parent_header : {}
84 86 };
85 87 return msg;
@@ -596,12 +598,12 b' define(['
596 598 *
597 599 * @function send_shell_message
598 600 */
599 Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata) {
601 Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) {
600 602 if (!this.is_connected()) {
601 603 throw new Error("kernel is not connected");
602 604 }
603 var msg = this._get_msg(msg_type, content, metadata);
604 this.channels.shell.send(JSON.stringify(msg));
605 var msg = this._get_msg(msg_type, content, metadata, buffers);
606 this.channels.shell.send(serialize.serialize(msg));
605 607 this.set_callbacks_for_msg(msg.header.msg_id, callbacks);
606 608 return msg.header.msg_id;
607 609 };
@@ -752,7 +754,7 b' define(['
752 754 };
753 755 this.events.trigger('input_reply.Kernel', {kernel: this, content: content});
754 756 var msg = this._get_msg("input_reply", content);
755 this.channels.stdin.send(JSON.stringify(msg));
757 this.channels.stdin.send(serialize.serialize(msg));
756 758 return msg.header.msg_id;
757 759 };
758 760
@@ -850,7 +852,10 b' define(['
850 852 * @function _handle_shell_reply
851 853 */
852 854 Kernel.prototype._handle_shell_reply = function (e) {
853 var reply = $.parseJSON(e.data);
855 serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this));
856 };
857
858 Kernel.prototype._finish_shell_reply = function (reply) {
854 859 this.events.trigger('shell_reply.Kernel', {kernel: this, reply: reply});
855 860 var content = reply.content;
856 861 var metadata = reply.metadata;
@@ -978,8 +983,11 b' define(['
978 983 * @function _handle_iopub_message
979 984 */
980 985 Kernel.prototype._handle_iopub_message = function (e) {
981 var msg = $.parseJSON(e.data);
986 serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this));
987 };
988
982 989
990 Kernel.prototype._finish_iopub_message = function (msg) {
983 991 var handler = this.get_iopub_handler(msg.header.msg_type);
984 992 if (handler !== undefined) {
985 993 handler(msg);
@@ -990,7 +998,11 b' define(['
990 998 * @function _handle_input_request
991 999 */
992 1000 Kernel.prototype._handle_input_request = function (e) {
993 var request = $.parseJSON(e.data);
1001 serialize.deserialize(e.data, $.proxy(this._finish_input_request, this));
1002 };
1003
1004
1005 Kernel.prototype._finish_input_request = function (request) {
994 1006 var header = request.header;
995 1007 var content = request.content;
996 1008 var metadata = request.metadata;
@@ -317,6 +317,7 b' class="notebook_app"'
317 317 {% block script %}
318 318 {{super()}}
319 319
320 <script src="{{ static_url("components/text-encoding/lib/encoding.js") }}" charset="utf-8"></script>
320 321
321 322 <script src="{{ static_url("notebook/js/main.js") }}" charset="utf-8"></script>
322 323
@@ -176,7 +176,7 b' class ZMQSocketChannel(Thread):'
176 176 Unpacks message, and calls handlers with it.
177 177 """
178 178 ident,smsg = self.session.feed_identities(msg)
179 msg = self.session.unserialize(smsg)
179 msg = self.session.deserialize(smsg)
180 180 self.call_handlers(msg)
181 181
182 182
@@ -57,7 +57,7 b' class Comm(LoggingConfigurable):'
57 57 # I am primary, open my peer.
58 58 self.open(data)
59 59
60 def _publish_msg(self, msg_type, data=None, metadata=None, **keys):
60 def _publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
61 61 """Helper for sending a comm message on IOPub"""
62 62 data = {} if data is None else data
63 63 metadata = {} if metadata is None else metadata
@@ -67,6 +67,7 b' class Comm(LoggingConfigurable):'
67 67 metadata=json_clean(metadata),
68 68 parent=self.kernel._parent_header,
69 69 ident=self.topic,
70 buffers=buffers,
70 71 )
71 72
72 73 def __del__(self):
@@ -75,7 +76,7 b' class Comm(LoggingConfigurable):'
75 76
76 77 # publishing messages
77 78
78 def open(self, data=None, metadata=None):
79 def open(self, data=None, metadata=None, buffers=None):
79 80 """Open the frontend-side version of this comm"""
80 81 if data is None:
81 82 data = self._open_data
@@ -86,22 +87,29 b' class Comm(LoggingConfigurable):'
86 87
87 88 comm_manager.register_comm(self)
88 89 self._closed = False
89 self._publish_msg('comm_open', data, metadata, target_name=self.target_name)
90 self._publish_msg('comm_open',
91 data=data, metadata=metadata, buffers=buffers,
92 target_name=self.target_name,
93 )
90 94
91 def close(self, data=None, metadata=None):
95 def close(self, data=None, metadata=None, buffers=None):
92 96 """Close the frontend-side version of this comm"""
93 97 if self._closed:
94 98 # only close once
95 99 return
96 100 if data is None:
97 101 data = self._close_data
98 self._publish_msg('comm_close', data, metadata)
102 self._publish_msg('comm_close',
103 data=data, metadata=metadata, buffers=buffers,
104 )
99 105 self.kernel.comm_manager.unregister_comm(self)
100 106 self._closed = True
101 107
102 def send(self, data=None, metadata=None):
108 def send(self, data=None, metadata=None, buffers=None):
103 109 """Send a message to the frontend-side version of this comm"""
104 self._publish_msg('comm_msg', data, metadata)
110 self._publish_msg('comm_msg',
111 data=data, metadata=metadata, buffers=buffers,
112 )
105 113
106 114 # registering callbacks
107 115
@@ -130,7 +130,7 b' class Kernel(SingletonConfigurable):'
130 130 """dispatch control requests"""
131 131 idents,msg = self.session.feed_identities(msg, copy=False)
132 132 try:
133 msg = self.session.unserialize(msg, content=True, copy=False)
133 msg = self.session.deserialize(msg, content=True, copy=False)
134 134 except:
135 135 self.log.error("Invalid Control Message", exc_info=True)
136 136 return
@@ -165,7 +165,7 b' class Kernel(SingletonConfigurable):'
165 165
166 166 idents,msg = self.session.feed_identities(msg, copy=False)
167 167 try:
168 msg = self.session.unserialize(msg, content=True, copy=False)
168 msg = self.session.deserialize(msg, content=True, copy=False)
169 169 except:
170 170 self.log.error("Invalid Message", exc_info=True)
171 171 return
@@ -89,7 +89,7 b' def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):'
89 89 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
90 90 return buffers
91 91
92 def unserialize_object(buffers, g=None):
92 def deserialize_object(buffers, g=None):
93 93 """reconstruct an object serialized by serialize_object from data buffers.
94 94
95 95 Parameters
@@ -170,14 +170,14 b' def unpack_apply_message(bufs, g=None, copy=True):'
170 170
171 171 args = []
172 172 for i in range(info['nargs']):
173 arg, arg_bufs = unserialize_object(arg_bufs, g)
173 arg, arg_bufs = deserialize_object(arg_bufs, g)
174 174 args.append(arg)
175 175 args = tuple(args)
176 176 assert not arg_bufs, "Shouldn't be any arg bufs left over"
177 177
178 178 kwargs = {}
179 179 for key in info['kw_keys']:
180 kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
180 kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g)
181 181 kwargs[key] = kwarg
182 182 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
183 183
@@ -18,6 +18,7 b' import os'
18 18 import pprint
19 19 import random
20 20 import uuid
21 import warnings
21 22 from datetime import datetime
22 23
23 24 try:
@@ -492,7 +493,7 b' class Session(Configurable):'
492 493 """Return the nested message dict.
493 494
494 495 This format is different from what is sent over the wire. The
495 serialize/unserialize methods converts this nested message dict to the wire
496 serialize/deserialize methods converts this nested message dict to the wire
496 497 format, which is a list of message parts.
497 498 """
498 499 msg = {}
@@ -525,7 +526,7 b' class Session(Configurable):'
525 526 def serialize(self, msg, ident=None):
526 527 """Serialize the message components to bytes.
527 528
528 This is roughly the inverse of unserialize. The serialize/unserialize
529 This is roughly the inverse of deserialize. The serialize/deserialize
529 530 methods work with full message lists, whereas pack/unpack work with
530 531 the individual message parts in the message list.
531 532
@@ -590,7 +591,7 b' class Session(Configurable):'
590 591 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
591 592 buffer1,buffer2,...]
592 593
593 The serialize/unserialize methods convert the nested message dict into this
594 The serialize/deserialize methods convert the nested message dict into this
594 595 format.
595 596
596 597 Parameters
@@ -634,6 +635,7 b' class Session(Configurable):'
634 635 # We got a Message or message dict, not a msg_type so don't
635 636 # build a new Message.
636 637 msg = msg_or_type
638 buffers = buffers or msg.get('buffers', [])
637 639 else:
638 640 msg = self.msg(msg_or_type, content=content, parent=parent,
639 641 header=header, metadata=metadata)
@@ -722,7 +724,7 b' class Session(Configurable):'
722 724 # invalid large messages can cause very expensive string comparisons
723 725 idents, msg_list = self.feed_identities(msg_list, copy)
724 726 try:
725 return idents, self.unserialize(msg_list, content=content, copy=copy)
727 return idents, self.deserialize(msg_list, content=content, copy=copy)
726 728 except Exception as e:
727 729 # TODO: handle it
728 730 raise e
@@ -747,7 +749,7 b' class Session(Configurable):'
747 749 idents will always be a list of bytes, each of which is a ZMQ
748 750 identity. msg_list will be a list of bytes or zmq.Messages of the
749 751 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
750 should be unpackable/unserializable via self.unserialize at this
752 should be unpackable/unserializable via self.deserialize at this
751 753 point.
752 754 """
753 755 if copy:
@@ -788,10 +790,10 b' class Session(Configurable):'
788 790 to_cull = random.sample(self.digest_history, n_to_cull)
789 791 self.digest_history.difference_update(to_cull)
790 792
791 def unserialize(self, msg_list, content=True, copy=True):
793 def deserialize(self, msg_list, content=True, copy=True):
792 794 """Unserialize a msg_list to a nested message dict.
793 795
794 This is roughly the inverse of serialize. The serialize/unserialize
796 This is roughly the inverse of serialize. The serialize/deserialize
795 797 methods work with full message lists, whereas pack/unpack work with
796 798 the individual message parts in the message list.
797 799
@@ -842,10 +844,16 b' class Session(Configurable):'
842 844 message['content'] = msg_list[4]
843 845
844 846 message['buffers'] = msg_list[5:]
845 # print("received: %s: %s\n %s" % (message['msg_type'], message['header'], message['content']))
846 847 # adapt to the current version
847 848 return adapt(message)
848 # print("adapted: %s: %s\n %s" % (adapted['msg_type'], adapted['header'], adapted['content']))
849
850 def unserialize(self, *args, **kwargs):
851 warnings.warn(
852 "Session.unserialize is deprecated. Use Session.deserialize.",
853 DeprecationWarning,
854 )
855 return self.deserialize(*args, **kwargs)
856
849 857
850 858 def test_msg2obj():
851 859 am = dict(x=1)
@@ -9,7 +9,7 b' from collections import namedtuple'
9 9 import nose.tools as nt
10 10
11 11 # from unittest import TestCaes
12 from IPython.kernel.zmq.serialize import serialize_object, unserialize_object
12 from IPython.kernel.zmq.serialize import serialize_object, deserialize_object
13 13 from IPython.testing import decorators as dec
14 14 from IPython.utils.pickleutil import CannedArray, CannedClass
15 15 from IPython.utils.py3compat import iteritems
@@ -22,7 +22,7 b' from IPython.parallel import interactive'
22 22 def roundtrip(obj):
23 23 """roundtrip an object through serialization"""
24 24 bufs = serialize_object(obj)
25 obj2, remainder = unserialize_object(bufs)
25 obj2, remainder = deserialize_object(bufs)
26 26 nt.assert_equals(remainder, [])
27 27 return obj2
28 28
@@ -70,7 +70,7 b' def test_roundtrip_buffered():'
70 70 ]:
71 71 bufs = serialize_object(obj)
72 72 nt.assert_equal(len(bufs), 2)
73 obj2, remainder = unserialize_object(bufs)
73 obj2, remainder = deserialize_object(bufs)
74 74 nt.assert_equal(remainder, [])
75 75 nt.assert_equal(obj, obj2)
76 76
@@ -82,7 +82,7 b' def test_numpy():'
82 82 for dtype in DTYPES:
83 83 A = new_array(shape, dtype=dtype)
84 84 bufs = serialize_object(A)
85 B, r = unserialize_object(bufs)
85 B, r = deserialize_object(bufs)
86 86 nt.assert_equal(r, [])
87 87 nt.assert_equal(A.shape, B.shape)
88 88 nt.assert_equal(A.dtype, B.dtype)
@@ -100,7 +100,7 b' def test_recarray():'
100 100 A = new_array(shape, dtype=dtype)
101 101
102 102 bufs = serialize_object(A)
103 B, r = unserialize_object(bufs)
103 B, r = deserialize_object(bufs)
104 104 nt.assert_equal(r, [])
105 105 nt.assert_equal(A.shape, B.shape)
106 106 nt.assert_equal(A.dtype, B.dtype)
@@ -116,7 +116,7 b' def test_numpy_in_seq():'
116 116 bufs = serialize_object((A,1,2,b'hello'))
117 117 canned = pickle.loads(bufs[0])
118 118 nt.assert_is_instance(canned[0], CannedArray)
119 tup, r = unserialize_object(bufs)
119 tup, r = deserialize_object(bufs)
120 120 B = tup[0]
121 121 nt.assert_equal(r, [])
122 122 nt.assert_equal(A.shape, B.shape)
@@ -133,7 +133,7 b' def test_numpy_in_dict():'
133 133 bufs = serialize_object(dict(a=A,b=1,c=range(20)))
134 134 canned = pickle.loads(bufs[0])
135 135 nt.assert_is_instance(canned['a'], CannedArray)
136 d, r = unserialize_object(bufs)
136 d, r = deserialize_object(bufs)
137 137 B = d['a']
138 138 nt.assert_equal(r, [])
139 139 nt.assert_equal(A.shape, B.shape)
@@ -147,7 +147,7 b' def test_class():'
147 147 bufs = serialize_object(dict(C=C))
148 148 canned = pickle.loads(bufs[0])
149 149 nt.assert_is_instance(canned['C'], CannedClass)
150 d, r = unserialize_object(bufs)
150 d, r = deserialize_object(bufs)
151 151 C2 = d['C']
152 152 nt.assert_equal(C2.a, C.a)
153 153
@@ -159,7 +159,7 b' def test_class_oldstyle():'
159 159 bufs = serialize_object(dict(C=C))
160 160 canned = pickle.loads(bufs[0])
161 161 nt.assert_is_instance(canned['C'], CannedClass)
162 d, r = unserialize_object(bufs)
162 d, r = deserialize_object(bufs)
163 163 C2 = d['C']
164 164 nt.assert_equal(C2.a, C.a)
165 165
@@ -168,7 +168,7 b' def test_tuple():'
168 168 bufs = serialize_object(tup)
169 169 canned = pickle.loads(bufs[0])
170 170 nt.assert_is_instance(canned, tuple)
171 t2, r = unserialize_object(bufs)
171 t2, r = deserialize_object(bufs)
172 172 nt.assert_equal(t2[0](t2[1]), tup[0](tup[1]))
173 173
174 174 point = namedtuple('point', 'x y')
@@ -178,7 +178,7 b' def test_namedtuple():'
178 178 bufs = serialize_object(p)
179 179 canned = pickle.loads(bufs[0])
180 180 nt.assert_is_instance(canned, point)
181 p2, r = unserialize_object(bufs, globals())
181 p2, r = deserialize_object(bufs, globals())
182 182 nt.assert_equal(p2.x, p.x)
183 183 nt.assert_equal(p2.y, p.y)
184 184
@@ -187,7 +187,7 b' def test_list():'
187 187 bufs = serialize_object(lis)
188 188 canned = pickle.loads(bufs[0])
189 189 nt.assert_is_instance(canned, list)
190 l2, r = unserialize_object(bufs)
190 l2, r = deserialize_object(bufs)
191 191 nt.assert_equal(l2[0](l2[1]), lis[0](lis[1]))
192 192
193 193 def test_class_inheritance():
@@ -202,7 +202,7 b' def test_class_inheritance():'
202 202 bufs = serialize_object(dict(D=D))
203 203 canned = pickle.loads(bufs[0])
204 204 nt.assert_is_instance(canned['D'], CannedClass)
205 d, r = unserialize_object(bufs)
205 d, r = deserialize_object(bufs)
206 206 D2 = d['D']
207 207 nt.assert_equal(D2.a, D.a)
208 208 nt.assert_equal(D2.b, D.b)
@@ -60,7 +60,7 b' class TestSession(SessionTestCase):'
60 60 msg = self.session.msg('execute', content=dict(a=10, b=1.1))
61 61 msg_list = self.session.serialize(msg, ident=b'foo')
62 62 ident, msg_list = self.session.feed_identities(msg_list)
63 new_msg = self.session.unserialize(msg_list)
63 new_msg = self.session.deserialize(msg_list)
64 64 self.assertEqual(ident[0], b'foo')
65 65 self.assertEqual(new_msg['msg_id'],msg['msg_id'])
66 66 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
@@ -82,7 +82,7 b' class TestSession(SessionTestCase):'
82 82 self.session.send(A, msg, ident=b'foo', buffers=[b'bar'])
83 83
84 84 ident, msg_list = self.session.feed_identities(B.recv_multipart())
85 new_msg = self.session.unserialize(msg_list)
85 new_msg = self.session.deserialize(msg_list)
86 86 self.assertEqual(ident[0], b'foo')
87 87 self.assertEqual(new_msg['msg_id'],msg['msg_id'])
88 88 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
@@ -100,7 +100,7 b' class TestSession(SessionTestCase):'
100 100 self.session.send(A, None, content=content, parent=parent,
101 101 header=header, metadata=metadata, ident=b'foo', buffers=[b'bar'])
102 102 ident, msg_list = self.session.feed_identities(B.recv_multipart())
103 new_msg = self.session.unserialize(msg_list)
103 new_msg = self.session.deserialize(msg_list)
104 104 self.assertEqual(ident[0], b'foo')
105 105 self.assertEqual(new_msg['msg_id'],msg['msg_id'])
106 106 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
@@ -263,7 +263,7 b' class TestSession(SessionTestCase):'
263 263 p = session.msg('msg')
264 264 msg = session.msg('msg', content=content, metadata=metadata, parent=p['header'])
265 265 smsg = session.serialize(msg)
266 msg2 = session.unserialize(session.feed_identities(smsg)[1])
266 msg2 = session.deserialize(session.feed_identities(smsg)[1])
267 267 assert isinstance(msg2['header']['date'], datetime)
268 268 self.assertEqual(msg['header'], msg2['header'])
269 269 self.assertEqual(msg['parent_header'], msg2['parent_header'])
@@ -305,7 +305,7 b' class TestSession(SessionTestCase):'
305 305 self.session.send_raw(A, msg_list, ident=b'foo')
306 306
307 307 ident, new_msg_list = self.session.feed_identities(B.recv_multipart())
308 new_msg = self.session.unserialize(new_msg_list)
308 new_msg = self.session.deserialize(new_msg_list)
309 309 self.assertEqual(ident[0], b'foo')
310 310 self.assertEqual(new_msg['msg_type'],msg['msg_type'])
311 311 self.assertEqual(new_msg['header'],msg['header'])
@@ -801,7 +801,7 b' class Client(HasTraits):'
801 801
802 802 # construct result:
803 803 if content['status'] == 'ok':
804 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
804 self.results[msg_id] = serialize.deserialize_object(msg['buffers'])[0]
805 805 elif content['status'] == 'aborted':
806 806 self.results[msg_id] = error.TaskAborted(msg_id)
807 807 elif content['status'] == 'resubmitted':
@@ -903,7 +903,7 b' class Client(HasTraits):'
903 903 elif msg_type == 'execute_result':
904 904 md['execute_result'] = content
905 905 elif msg_type == 'data_message':
906 data, remainder = serialize.unserialize_object(msg['buffers'])
906 data, remainder = serialize.deserialize_object(msg['buffers'])
907 907 md['data'].update(data)
908 908 elif msg_type == 'status':
909 909 # idle message comes after all outputs
@@ -1612,7 +1612,7 b' class Client(HasTraits):'
1612 1612
1613 1613 if rcontent['status'] == 'ok':
1614 1614 if header['msg_type'] == 'apply_reply':
1615 res,buffers = serialize.unserialize_object(buffers)
1615 res,buffers = serialize.deserialize_object(buffers)
1616 1616 elif header['msg_type'] == 'execute_reply':
1617 1617 res = ExecuteReply(msg_id, rcontent, md)
1618 1618 else:
@@ -523,7 +523,7 b' class Hub(SessionFactory):'
523 523 return
524 524 client_id = idents[0]
525 525 try:
526 msg = self.session.unserialize(msg, content=True)
526 msg = self.session.deserialize(msg, content=True)
527 527 except Exception:
528 528 content = error.wrap_exception()
529 529 self.log.error("Bad Query Message: %r", msg, exc_info=True)
@@ -588,7 +588,7 b' class Hub(SessionFactory):'
588 588 return
589 589 queue_id, client_id = idents[:2]
590 590 try:
591 msg = self.session.unserialize(msg)
591 msg = self.session.deserialize(msg)
592 592 except Exception:
593 593 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
594 594 return
@@ -636,7 +636,7 b' class Hub(SessionFactory):'
636 636
637 637 client_id, queue_id = idents[:2]
638 638 try:
639 msg = self.session.unserialize(msg)
639 msg = self.session.deserialize(msg)
640 640 except Exception:
641 641 self.log.error("queue::engine %r sent invalid message to %r: %r",
642 642 queue_id, client_id, msg, exc_info=True)
@@ -690,7 +690,7 b' class Hub(SessionFactory):'
690 690 client_id = idents[0]
691 691
692 692 try:
693 msg = self.session.unserialize(msg)
693 msg = self.session.deserialize(msg)
694 694 except Exception:
695 695 self.log.error("task::client %r sent invalid task message: %r",
696 696 client_id, msg, exc_info=True)
@@ -740,7 +740,7 b' class Hub(SessionFactory):'
740 740 """save the result of a completed task."""
741 741 client_id = idents[0]
742 742 try:
743 msg = self.session.unserialize(msg)
743 msg = self.session.deserialize(msg)
744 744 except Exception:
745 745 self.log.error("task::invalid task result message send to %r: %r",
746 746 client_id, msg, exc_info=True)
@@ -794,7 +794,7 b' class Hub(SessionFactory):'
794 794
795 795 def save_task_destination(self, idents, msg):
796 796 try:
797 msg = self.session.unserialize(msg, content=True)
797 msg = self.session.deserialize(msg, content=True)
798 798 except Exception:
799 799 self.log.error("task::invalid task tracking message", exc_info=True)
800 800 return
@@ -831,7 +831,7 b' class Hub(SessionFactory):'
831 831 """save an iopub message into the db"""
832 832 # print (topics)
833 833 try:
834 msg = self.session.unserialize(msg, content=True)
834 msg = self.session.deserialize(msg, content=True)
835 835 except Exception:
836 836 self.log.error("iopub::invalid IOPub message", exc_info=True)
837 837 return
@@ -251,7 +251,7 b' class TaskScheduler(SessionFactory):'
251 251 self.log.warn("task::Invalid Message: %r",msg)
252 252 return
253 253 try:
254 msg = self.session.unserialize(msg)
254 msg = self.session.deserialize(msg)
255 255 except ValueError:
256 256 self.log.warn("task::Unauthorized message from: %r"%idents)
257 257 return
@@ -270,7 +270,7 b' class TaskScheduler(SessionFactory):'
270 270 self.log.warn("task::Invalid Message: %r",msg)
271 271 return
272 272 try:
273 msg = self.session.unserialize(msg)
273 msg = self.session.deserialize(msg)
274 274 except ValueError:
275 275 self.log.warn("task::Unauthorized message from: %r"%idents)
276 276 return
@@ -375,7 +375,7 b' class TaskScheduler(SessionFactory):'
375 375 self.notifier_stream.flush()
376 376 try:
377 377 idents, msg = self.session.feed_identities(raw_msg, copy=False)
378 msg = self.session.unserialize(msg, content=False, copy=False)
378 msg = self.session.deserialize(msg, content=False, copy=False)
379 379 except Exception:
380 380 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
381 381 return
@@ -621,7 +621,7 b' class TaskScheduler(SessionFactory):'
621 621 """dispatch method for result replies"""
622 622 try:
623 623 idents,msg = self.session.feed_identities(raw_msg, copy=False)
624 msg = self.session.unserialize(msg, content=False, copy=False)
624 msg = self.session.deserialize(msg, content=False, copy=False)
625 625 engine = idents[0]
626 626 try:
627 627 idx = self.targets.index(engine)
@@ -159,7 +159,7 b' class EngineFactory(RegistrationFactory):'
159 159 loop = self.loop
160 160 identity = self.bident
161 161 idents,msg = self.session.feed_identities(msg)
162 msg = self.session.unserialize(msg)
162 msg = self.session.deserialize(msg)
163 163 content = msg['content']
164 164 info = self.connection_info
165 165
@@ -162,6 +162,7 b' def find_package_data():'
162 162 pjoin(components, "underscore", "underscore-min.js"),
163 163 pjoin(components, "moment", "moment.js"),
164 164 pjoin(components, "moment", "min","moment.min.js"),
165 pjoin(components, "text-encoding", "lib", "encoding.js"),
165 166 ])
166 167
167 168 # Ship all of Codemirror's CSS and JS
General Comments 0
You need to be logged in to leave comments. Login now