From 2a8a2c87afc9cc2d1f81ee82b5c18d050b83726d 2014-10-19 09:03:24 From: Matthias Bussonnier Date: 2014-10-19 09:03:24 Subject: [PATCH] Merge pull request #6110 from minrk/binarycomm support binary buffers in comm messages --- diff --git a/IPython/html/base/zmqhandlers.py b/IPython/html/base/zmqhandlers.py index b3d7810..d0b77a1 100644 --- a/IPython/html/base/zmqhandlers.py +++ b/IPython/html/base/zmqhandlers.py @@ -4,6 +4,7 @@ # Distributed under the terms of the Modified BSD License. import json +import struct try: from urllib.parse import urlparse # Py 3 @@ -22,12 +23,70 @@ from tornado import web from tornado import websocket from IPython.kernel.zmq.session import Session -from IPython.utils.jsonutil import date_default +from IPython.utils.jsonutil import date_default, extract_dates from IPython.utils.py3compat import PY3, cast_unicode from .handlers import IPythonHandler +def serialize_binary_message(msg): + """serialize a message as a binary blob + + Header: + + 4 bytes: number of msg parts (nbufs) as 32b int + 4 * nbufs bytes: offset for each buffer as integer as 32b int + + Offsets are from the start of the buffer, including the header. + + Returns + ------- + + The message serialized to bytes. + + """ + # don't modify msg or buffer list in-place + msg = msg.copy() + buffers = list(msg.pop('buffers')) + bmsg = json.dumps(msg, default=date_default).encode('utf8') + buffers.insert(0, bmsg) + nbufs = len(buffers) + offsets = [4 * (nbufs + 1)] + for buf in buffers[:-1]: + offsets.append(offsets[-1] + len(buf)) + offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets) + buffers.insert(0, offsets_buf) + return b''.join(buffers) + + +def deserialize_binary_message(bmsg): + """deserialize a message from a binary blog + + Header: + + 4 bytes: number of msg parts (nbufs) as 32b int + 4 * nbufs bytes: offset for each buffer as integer as 32b int + + Offsets are from the start of the buffer, including the header. + + Returns + ------- + + message dictionary + """ + nbufs = struct.unpack('!i', bmsg[:4])[0] + offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)])) + offsets.append(None) + bufs = [] + for start, stop in zip(offsets[:-1], offsets[1:]): + bufs.append(bmsg[start:stop]) + msg = json.loads(bufs[0].decode('utf8')) + msg['header'] = extract_dates(msg['header']) + msg['parent_header'] = extract_dates(msg['parent_header']) + msg['buffers'] = bufs[1:] + return msg + + class ZMQStreamHandler(websocket.WebSocketHandler): def check_origin(self, origin): @@ -77,23 +136,19 @@ class ZMQStreamHandler(websocket.WebSocketHandler): def _reserialize_reply(self, msg_list): """Reserialize a reply message using JSON. - This takes the msg list from the ZMQ socket, unserializes it using + This takes the msg list from the ZMQ socket, deserializes it using self.session and then serializes the result using JSON. This method should be used by self._on_zmq_reply to build messages that can be sent back to the browser. """ idents, msg_list = self.session.feed_identities(msg_list) - msg = self.session.unserialize(msg_list) - try: - msg['header'].pop('date') - except KeyError: - pass - try: - msg['parent_header'].pop('date') - except KeyError: - pass - msg.pop('buffers') - return json.dumps(msg, default=date_default) + msg = self.session.deserialize(msg_list) + if msg['buffers']: + buf = serialize_binary_message(msg) + return buf + else: + smsg = json.dumps(msg, default=date_default) + return cast_unicode(smsg) def _on_zmq_reply(self, msg_list): # Sometimes this gets triggered when the on_close method is scheduled in the @@ -104,7 +159,7 @@ class ZMQStreamHandler(websocket.WebSocketHandler): except Exception: self.log.critical("Malformed message: %r" % msg_list, exc_info=True) else: - self.write_message(msg) + self.write_message(msg, binary=isinstance(msg, bytes)) def allow_draft76(self): """Allow draft 76, until browsers such as Safari update to RFC 6455. diff --git a/IPython/html/services/kernels/handlers.py b/IPython/html/services/kernels/handlers.py index 7d89ad6..6c5f2df 100644 --- a/IPython/html/services/kernels/handlers.py +++ b/IPython/html/services/kernels/handlers.py @@ -12,7 +12,7 @@ from IPython.utils.py3compat import string_types from IPython.html.utils import url_path_join, url_escape from ...base.handlers import IPythonHandler, json_errors -from ...base.zmqhandlers import AuthenticatedZMQStreamHandler +from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message from IPython.core.release import kernel_protocol_version @@ -110,7 +110,7 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler): """ idents,msg = self.session.feed_identities(msg) try: - msg = self.session.unserialize(msg) + msg = self.session.deserialize(msg) except: self.log.error("Bad kernel_info reply", exc_info=True) self._request_kernel_info() @@ -150,7 +150,10 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler): self.log.info("%s closed, closing websocket.", self) self.close() return - msg = json.loads(msg) + if isinstance(msg, bytes): + msg = deserialize_binary_message(msg) + else: + msg = json.loads(msg) self.session.send(self.zmq_stream, msg) def on_close(self): diff --git a/IPython/html/static/base/js/utils.js b/IPython/html/static/base/js/utils.js index 61eb84a..8e25303 100644 --- a/IPython/html/static/base/js/utils.js +++ b/IPython/html/static/base/js/utils.js @@ -553,7 +553,7 @@ define([ ], callback, errback ); }; - + var utils = { regex_split : regex_split, uuid : uuid, diff --git a/IPython/html/static/components b/IPython/html/static/components index 56b35d8..1968f4f 160000 --- a/IPython/html/static/components +++ b/IPython/html/static/components @@ -1 +1 @@ -Subproject commit 56b35d85bb0ea150458282f4064292a5c211025a +Subproject commit 1968f4f78d7e8cd227d0b3f4cc3183591969b52a diff --git a/IPython/html/static/services/kernels/js/comm.js b/IPython/html/static/services/kernels/js/comm.js index 04307cf..91f3dc8 100644 --- a/IPython/html/static/services/kernels/js/comm.js +++ b/IPython/html/static/services/kernels/js/comm.js @@ -129,12 +129,12 @@ define([ return this.kernel.send_shell_message("comm_open", content, callbacks, metadata); }; - Comm.prototype.send = function (data, callbacks, metadata) { + Comm.prototype.send = function (data, callbacks, metadata, buffers) { var content = { comm_id : this.comm_id, data : data || {}, }; - return this.kernel.send_shell_message("comm_msg", content, callbacks, metadata); + return this.kernel.send_shell_message("comm_msg", content, callbacks, metadata, buffers); }; Comm.prototype.close = function (data, callbacks, metadata) { diff --git a/IPython/html/static/services/kernels/js/kernel.js b/IPython/html/static/services/kernels/js/kernel.js index 3a25e90..9ad4fcb 100644 --- a/IPython/html/static/services/kernels/js/kernel.js +++ b/IPython/html/static/services/kernels/js/kernel.js @@ -5,9 +5,10 @@ define([ 'base/js/namespace', 'jquery', 'base/js/utils', - 'services/kernels/js/comm', - 'widgets/js/init', -], function(IPython, $, utils, comm, widgetmanager) { + './comm', + './serialize', + 'widgets/js/init' +], function(IPython, $, utils, comm, serialize, widgetmanager) { "use strict"; /** @@ -69,7 +70,7 @@ define([ /** * @function _get_msg */ - Kernel.prototype._get_msg = function (msg_type, content, metadata) { + Kernel.prototype._get_msg = function (msg_type, content, metadata, buffers) { var msg = { header : { msg_id : utils.uuid(), @@ -80,6 +81,7 @@ define([ }, metadata : metadata || {}, content : content, + buffers : buffers || [], parent_header : {} }; return msg; @@ -596,12 +598,12 @@ define([ * * @function send_shell_message */ - Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata) { + Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) { if (!this.is_connected()) { throw new Error("kernel is not connected"); } - var msg = this._get_msg(msg_type, content, metadata); - this.channels.shell.send(JSON.stringify(msg)); + var msg = this._get_msg(msg_type, content, metadata, buffers); + this.channels.shell.send(serialize.serialize(msg)); this.set_callbacks_for_msg(msg.header.msg_id, callbacks); return msg.header.msg_id; }; @@ -752,7 +754,7 @@ define([ }; this.events.trigger('input_reply.Kernel', {kernel: this, content: content}); var msg = this._get_msg("input_reply", content); - this.channels.stdin.send(JSON.stringify(msg)); + this.channels.stdin.send(serialize.serialize(msg)); return msg.header.msg_id; }; @@ -850,8 +852,11 @@ define([ * @function _handle_shell_reply */ Kernel.prototype._handle_shell_reply = function (e) { - var reply = $.parseJSON(e.data); - this.events.trigger('shell_reply.Kernel', {kernel: this, reply: reply}); + serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this)); + }; + + Kernel.prototype._finish_shell_reply = function (reply) { + this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply}); var content = reply.content; var metadata = reply.metadata; var parent_id = reply.parent_header.msg_id; @@ -978,8 +983,11 @@ define([ * @function _handle_iopub_message */ Kernel.prototype._handle_iopub_message = function (e) { - var msg = $.parseJSON(e.data); + serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this)); + }; + + Kernel.prototype._finish_iopub_message = function (msg) { var handler = this.get_iopub_handler(msg.header.msg_type); if (handler !== undefined) { handler(msg); @@ -990,7 +998,11 @@ define([ * @function _handle_input_request */ Kernel.prototype._handle_input_request = function (e) { - var request = $.parseJSON(e.data); + serialize.deserialize(e.data, $.proxy(this._finish_input_request, this)); + }; + + + Kernel.prototype._finish_input_request = function (request) { var header = request.header; var content = request.content; var metadata = request.metadata; diff --git a/IPython/html/static/services/kernels/js/serialize.js b/IPython/html/static/services/kernels/js/serialize.js new file mode 100644 index 0000000..c86e366 --- /dev/null +++ b/IPython/html/static/services/kernels/js/serialize.js @@ -0,0 +1,114 @@ +// Copyright (c) IPython Development Team. +// Distributed under the terms of the Modified BSD License. + +define([ + 'underscore', + ], function (_) { + "use strict"; + + var _deserialize_array_buffer = function (buf) { + var data = new DataView(buf); + // read the header: 1 + nbufs 32b integers + var nbufs = data.getUint32(0); + var offsets = []; + var i; + for (i = 1; i <= nbufs; i++) { + offsets.push(data.getUint32(i * 4)); + } + var json_bytes = new Uint8Array(buf.slice(offsets[0], offsets[1])); + var msg = JSON.parse( + (new TextDecoder('utf8')).decode(json_bytes) + ); + // the remaining chunks are stored as DataViews in msg.buffers + msg.buffers = []; + var start, stop; + for (i = 1; i < nbufs; i++) { + start = offsets[i]; + stop = offsets[i+1] || buf.byteLength; + msg.buffers.push(new DataView(buf.slice(start, stop))); + } + return msg; + }; + + var _deserialize_binary = function(data, callback) { + // deserialize the binary message format + // callback will be called with a message whose buffers attribute + // will be an array of DataViews. + if (data instanceof Blob) { + // data is Blob, have to deserialize from ArrayBuffer in reader callback + var reader = new FileReader(); + reader.onload = function () { + var msg = _deserialize_array_buffer(this.result); + callback(msg); + }; + reader.readAsArrayBuffer(data); + } else { + // data is ArrayBuffer, can deserialize directly + var msg = _deserialize_array_buffer(data); + callback(msg); + } + }; + + var deserialize = function (data, callback) { + // deserialize a message and pass the unpacked message object to callback + if (typeof data === "string") { + // text JSON message + callback(JSON.parse(data)); + } else { + // binary message + _deserialize_binary(data, callback); + } + }; + + var _serialize_binary = function (msg) { + // implement the binary serialization protocol + // serializes JSON message to ArrayBuffer + msg = _.clone(msg); + var offsets = []; + var buffers = []; + msg.buffers.map(function (buf) { + buffers.push(buf); + }); + delete msg.buffers; + var json_utf8 = (new TextEncoder('utf8')).encode(JSON.stringify(msg)); + buffers.unshift(json_utf8); + var nbufs = buffers.length; + offsets.push(4 * (nbufs + 1)); + var i; + for (i = 0; i + 1 < buffers.length; i++) { + offsets.push(offsets[offsets.length-1] + buffers[i].byteLength); + } + var msg_buf = new Uint8Array( + offsets[offsets.length-1] + buffers[buffers.length-1].byteLength + ); + // use DataView.setUint32 for network byte-order + var view = new DataView(msg_buf.buffer); + // write nbufs to first 4 bytes + view.setUint32(0, nbufs); + // write offsets to next 4 * nbufs bytes + for (i = 0; i < offsets.length; i++) { + view.setUint32(4 * (i+1), offsets[i]); + } + // write all the buffers at their respective offsets + for (i = 0; i < buffers.length; i++) { + msg_buf.set(new Uint8Array(buffers[i].buffer), offsets[i]); + } + + // return raw ArrayBuffer + return msg_buf.buffer; + }; + + var serialize = function (msg) { + if (msg.buffers && msg.buffers.length) { + return _serialize_binary(msg); + } else { + return JSON.stringify(msg); + } + }; + + var exports = { + deserialize : deserialize, + serialize: serialize + }; + return exports; +}); \ No newline at end of file diff --git a/IPython/html/templates/notebook.html b/IPython/html/templates/notebook.html index c116506..790f748 100644 --- a/IPython/html/templates/notebook.html +++ b/IPython/html/templates/notebook.html @@ -317,6 +317,7 @@ class="notebook_app" {% block script %} {{super()}} + diff --git a/IPython/html/tests/services/serialize.js b/IPython/html/tests/services/serialize.js new file mode 100644 index 0000000..fe61bf1 --- /dev/null +++ b/IPython/html/tests/services/serialize.js @@ -0,0 +1,113 @@ +// +// Test binary messages on websockets. +// Only works on slimer for now, due to old websocket impl in phantomjs. +// + +casper.notebook_test(function () { + if (!this.slimerjs) { + console.log("Can't test binary websockets on phantomjs."); + return; + } + // create EchoBuffers target on js-side. + // it just captures and echos comm messages. + this.then(function () { + var success = this.evaluate(function () { + IPython._msgs = []; + + var EchoBuffers = function(comm) { + this.comm = comm; + this.comm.on_msg($.proxy(this.on_msg, this)); + }; + + EchoBuffers.prototype.on_msg = function (msg) { + IPython._msgs.push(msg); + this.comm.send(msg.content.data, {}, {}, msg.buffers); + }; + + IPython.notebook.kernel.comm_manager.register_target("echo", function (comm) { + return new EchoBuffers(comm); + }); + + return true; + }); + this.test.assertEquals(success, true, "Created echo comm target"); + }); + + // Create a similar comm that captures messages Python-side + this.then(function () { + var index = this.append_cell([ + "import os", + "from IPython.kernel.comm import Comm", + "comm = Comm(target_name='echo')", + "msgs = []", + "def on_msg(msg):", + " msgs.append(msg)", + "comm.on_msg(on_msg)" + ].join('\n'), 'code'); + this.execute_cell(index); + }); + + // send a message with binary data + this.then(function () { + var index = this.append_cell([ + "buffers = [b'\\xFF\\x00', b'\\x00\\x01\\x02']", + "comm.send(data='hi', buffers=buffers)" + ].join('\n'), 'code'); + this.execute_cell(index); + }); + + // wait for capture + this.waitFor(function () { + return this.evaluate(function () { + return IPython._msgs.length > 0; + }); + }); + + // validate captured buffers js-side + this.then(function () { + var msgs = this.evaluate(function () { + return IPython._msgs; + }); + this.test.assertEquals(msgs.length, 1, "Captured comm message"); + var buffers = msgs[0].buffers; + this.test.assertEquals(buffers.length, 2, "comm message has buffers"); + + // extract attributes to test in evaluate, + // because the raw DataViews can't be passed across + var buf_info = function (index) { + var buf = IPython._msgs[0].buffers[index]; + var data = {}; + data.byteLength = buf.byteLength; + data.bytes = []; + for (var i = 0; i < data.byteLength; i++) { + data.bytes.push(buf.getUint8(i)); + } + return data; + }; + + buf0 = this.evaluate(buf_info, 0); + buf1 = this.evaluate(buf_info, 1); + this.test.assertEquals(buf0.byteLength, 2, 'buf[0] has correct size'); + this.test.assertEquals(buf0.bytes, [255, 0], 'buf[0] has correct bytes'); + this.test.assertEquals(buf1.byteLength, 3, 'buf[1] has correct size'); + this.test.assertEquals(buf1.bytes, [0, 1, 2], 'buf[1] has correct bytes'); + }); + + // validate captured buffers Python-side + this.then(function () { + var index = this.append_cell([ + "assert len(msgs) == 1, len(msgs)", + "bufs = msgs[0]['buffers']", + "assert len(bufs) == len(buffers), bufs", + "assert bufs[0].bytes == buffers[0], bufs[0].bytes", + "assert bufs[1].bytes == buffers[1], bufs[1].bytes", + "1", + ].join('\n'), 'code'); + this.execute_cell(index); + this.wait_for_output(index); + this.then(function () { + var out = this.get_output_cell(index); + this.test.assertEquals(out['text/plain'], '1', "Python received buffers"); + }); + }); +}); diff --git a/IPython/html/tests/test_serialize.py b/IPython/html/tests/test_serialize.py new file mode 100644 index 0000000..7a88b29 --- /dev/null +++ b/IPython/html/tests/test_serialize.py @@ -0,0 +1,26 @@ +"""Test serialize/deserialize messages with buffers""" + +import os + +import nose.tools as nt + +from IPython.kernel.zmq.session import Session +from ..base.zmqhandlers import ( + serialize_binary_message, + deserialize_binary_message, +) + +def test_serialize_binary(): + s = Session() + msg = s.msg('data_pub', content={'a': 'b'}) + msg['buffers'] = [ os.urandom(3) for i in range(3) ] + bmsg = serialize_binary_message(msg) + nt.assert_is_instance(bmsg, bytes) + +def test_deserialize_binary(): + s = Session() + msg = s.msg('data_pub', content={'a': 'b'}) + msg['buffers'] = [ os.urandom(2) for i in range(3) ] + bmsg = serialize_binary_message(msg) + msg2 = deserialize_binary_message(bmsg) + nt.assert_equal(msg2, msg) diff --git a/IPython/kernel/channels.py b/IPython/kernel/channels.py index c228bab..62ccaf5 100644 --- a/IPython/kernel/channels.py +++ b/IPython/kernel/channels.py @@ -176,7 +176,7 @@ class ZMQSocketChannel(Thread): Unpacks message, and calls handlers with it. """ ident,smsg = self.session.feed_identities(msg) - msg = self.session.unserialize(smsg) + msg = self.session.deserialize(smsg) self.call_handlers(msg) diff --git a/IPython/kernel/comm/comm.py b/IPython/kernel/comm/comm.py index 87371d3..4115f19 100644 --- a/IPython/kernel/comm/comm.py +++ b/IPython/kernel/comm/comm.py @@ -57,7 +57,7 @@ class Comm(LoggingConfigurable): # I am primary, open my peer. self.open(data) - def _publish_msg(self, msg_type, data=None, metadata=None, **keys): + def _publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys): """Helper for sending a comm message on IOPub""" data = {} if data is None else data metadata = {} if metadata is None else metadata @@ -67,6 +67,7 @@ class Comm(LoggingConfigurable): metadata=json_clean(metadata), parent=self.kernel._parent_header, ident=self.topic, + buffers=buffers, ) def __del__(self): @@ -75,7 +76,7 @@ class Comm(LoggingConfigurable): # publishing messages - def open(self, data=None, metadata=None): + def open(self, data=None, metadata=None, buffers=None): """Open the frontend-side version of this comm""" if data is None: data = self._open_data @@ -86,22 +87,29 @@ class Comm(LoggingConfigurable): comm_manager.register_comm(self) self._closed = False - self._publish_msg('comm_open', data, metadata, target_name=self.target_name) + self._publish_msg('comm_open', + data=data, metadata=metadata, buffers=buffers, + target_name=self.target_name, + ) - def close(self, data=None, metadata=None): + def close(self, data=None, metadata=None, buffers=None): """Close the frontend-side version of this comm""" if self._closed: # only close once return if data is None: data = self._close_data - self._publish_msg('comm_close', data, metadata) + self._publish_msg('comm_close', + data=data, metadata=metadata, buffers=buffers, + ) self.kernel.comm_manager.unregister_comm(self) self._closed = True - def send(self, data=None, metadata=None): + def send(self, data=None, metadata=None, buffers=None): """Send a message to the frontend-side version of this comm""" - self._publish_msg('comm_msg', data, metadata) + self._publish_msg('comm_msg', + data=data, metadata=metadata, buffers=buffers, + ) # registering callbacks diff --git a/IPython/kernel/zmq/kernelbase.py b/IPython/kernel/zmq/kernelbase.py index eb76981..85f7beb 100755 --- a/IPython/kernel/zmq/kernelbase.py +++ b/IPython/kernel/zmq/kernelbase.py @@ -130,7 +130,7 @@ class Kernel(SingletonConfigurable): """dispatch control requests""" idents,msg = self.session.feed_identities(msg, copy=False) try: - msg = self.session.unserialize(msg, content=True, copy=False) + msg = self.session.deserialize(msg, content=True, copy=False) except: self.log.error("Invalid Control Message", exc_info=True) return @@ -165,7 +165,7 @@ class Kernel(SingletonConfigurable): idents,msg = self.session.feed_identities(msg, copy=False) try: - msg = self.session.unserialize(msg, content=True, copy=False) + msg = self.session.deserialize(msg, content=True, copy=False) except: self.log.error("Invalid Message", exc_info=True) return diff --git a/IPython/kernel/zmq/serialize.py b/IPython/kernel/zmq/serialize.py index 18659c3..e014465 100644 --- a/IPython/kernel/zmq/serialize.py +++ b/IPython/kernel/zmq/serialize.py @@ -89,7 +89,7 @@ def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL)) return buffers -def unserialize_object(buffers, g=None): +def deserialize_object(buffers, g=None): """reconstruct an object serialized by serialize_object from data buffers. Parameters @@ -170,14 +170,14 @@ def unpack_apply_message(bufs, g=None, copy=True): args = [] for i in range(info['nargs']): - arg, arg_bufs = unserialize_object(arg_bufs, g) + arg, arg_bufs = deserialize_object(arg_bufs, g) args.append(arg) args = tuple(args) assert not arg_bufs, "Shouldn't be any arg bufs left over" kwargs = {} for key in info['kw_keys']: - kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g) + kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g) kwargs[key] = kwarg assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over" diff --git a/IPython/kernel/zmq/session.py b/IPython/kernel/zmq/session.py index 1ebdf16..16582bb 100644 --- a/IPython/kernel/zmq/session.py +++ b/IPython/kernel/zmq/session.py @@ -18,6 +18,7 @@ import os import pprint import random import uuid +import warnings from datetime import datetime try: @@ -492,7 +493,7 @@ class Session(Configurable): """Return the nested message dict. This format is different from what is sent over the wire. The - serialize/unserialize methods converts this nested message dict to the wire + serialize/deserialize methods converts this nested message dict to the wire format, which is a list of message parts. """ msg = {} @@ -525,7 +526,7 @@ class Session(Configurable): def serialize(self, msg, ident=None): """Serialize the message components to bytes. - This is roughly the inverse of unserialize. The serialize/unserialize + This is roughly the inverse of deserialize. The serialize/deserialize methods work with full message lists, whereas pack/unpack work with the individual message parts in the message list. @@ -590,7 +591,7 @@ class Session(Configurable): [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content, buffer1,buffer2,...] - The serialize/unserialize methods convert the nested message dict into this + The serialize/deserialize methods convert the nested message dict into this format. Parameters @@ -634,6 +635,7 @@ class Session(Configurable): # We got a Message or message dict, not a msg_type so don't # build a new Message. msg = msg_or_type + buffers = buffers or msg.get('buffers', []) else: msg = self.msg(msg_or_type, content=content, parent=parent, header=header, metadata=metadata) @@ -722,7 +724,7 @@ class Session(Configurable): # invalid large messages can cause very expensive string comparisons idents, msg_list = self.feed_identities(msg_list, copy) try: - return idents, self.unserialize(msg_list, content=content, copy=copy) + return idents, self.deserialize(msg_list, content=content, copy=copy) except Exception as e: # TODO: handle it raise e @@ -747,7 +749,7 @@ class Session(Configurable): idents will always be a list of bytes, each of which is a ZMQ identity. msg_list will be a list of bytes or zmq.Messages of the form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and - should be unpackable/unserializable via self.unserialize at this + should be unpackable/unserializable via self.deserialize at this point. """ if copy: @@ -788,10 +790,10 @@ class Session(Configurable): to_cull = random.sample(self.digest_history, n_to_cull) self.digest_history.difference_update(to_cull) - def unserialize(self, msg_list, content=True, copy=True): + def deserialize(self, msg_list, content=True, copy=True): """Unserialize a msg_list to a nested message dict. - This is roughly the inverse of serialize. The serialize/unserialize + This is roughly the inverse of serialize. The serialize/deserialize methods work with full message lists, whereas pack/unpack work with the individual message parts in the message list. @@ -842,10 +844,16 @@ class Session(Configurable): message['content'] = msg_list[4] message['buffers'] = msg_list[5:] - # print("received: %s: %s\n %s" % (message['msg_type'], message['header'], message['content'])) # adapt to the current version return adapt(message) - # print("adapted: %s: %s\n %s" % (adapted['msg_type'], adapted['header'], adapted['content'])) + + def unserialize(self, *args, **kwargs): + warnings.warn( + "Session.unserialize is deprecated. Use Session.deserialize.", + DeprecationWarning, + ) + return self.deserialize(*args, **kwargs) + def test_msg2obj(): am = dict(x=1) diff --git a/IPython/kernel/zmq/tests/test_serialize.py b/IPython/kernel/zmq/tests/test_serialize.py index 2c7fcc1..2ecb302 100644 --- a/IPython/kernel/zmq/tests/test_serialize.py +++ b/IPython/kernel/zmq/tests/test_serialize.py @@ -9,7 +9,7 @@ from collections import namedtuple import nose.tools as nt # from unittest import TestCaes -from IPython.kernel.zmq.serialize import serialize_object, unserialize_object +from IPython.kernel.zmq.serialize import serialize_object, deserialize_object from IPython.testing import decorators as dec from IPython.utils.pickleutil import CannedArray, CannedClass from IPython.utils.py3compat import iteritems @@ -22,7 +22,7 @@ from IPython.parallel import interactive def roundtrip(obj): """roundtrip an object through serialization""" bufs = serialize_object(obj) - obj2, remainder = unserialize_object(bufs) + obj2, remainder = deserialize_object(bufs) nt.assert_equals(remainder, []) return obj2 @@ -70,7 +70,7 @@ def test_roundtrip_buffered(): ]: bufs = serialize_object(obj) nt.assert_equal(len(bufs), 2) - obj2, remainder = unserialize_object(bufs) + obj2, remainder = deserialize_object(bufs) nt.assert_equal(remainder, []) nt.assert_equal(obj, obj2) @@ -82,7 +82,7 @@ def test_numpy(): for dtype in DTYPES: A = new_array(shape, dtype=dtype) bufs = serialize_object(A) - B, r = unserialize_object(bufs) + B, r = deserialize_object(bufs) nt.assert_equal(r, []) nt.assert_equal(A.shape, B.shape) nt.assert_equal(A.dtype, B.dtype) @@ -100,7 +100,7 @@ def test_recarray(): A = new_array(shape, dtype=dtype) bufs = serialize_object(A) - B, r = unserialize_object(bufs) + B, r = deserialize_object(bufs) nt.assert_equal(r, []) nt.assert_equal(A.shape, B.shape) nt.assert_equal(A.dtype, B.dtype) @@ -116,7 +116,7 @@ def test_numpy_in_seq(): bufs = serialize_object((A,1,2,b'hello')) canned = pickle.loads(bufs[0]) nt.assert_is_instance(canned[0], CannedArray) - tup, r = unserialize_object(bufs) + tup, r = deserialize_object(bufs) B = tup[0] nt.assert_equal(r, []) nt.assert_equal(A.shape, B.shape) @@ -133,7 +133,7 @@ def test_numpy_in_dict(): bufs = serialize_object(dict(a=A,b=1,c=range(20))) canned = pickle.loads(bufs[0]) nt.assert_is_instance(canned['a'], CannedArray) - d, r = unserialize_object(bufs) + d, r = deserialize_object(bufs) B = d['a'] nt.assert_equal(r, []) nt.assert_equal(A.shape, B.shape) @@ -147,7 +147,7 @@ def test_class(): bufs = serialize_object(dict(C=C)) canned = pickle.loads(bufs[0]) nt.assert_is_instance(canned['C'], CannedClass) - d, r = unserialize_object(bufs) + d, r = deserialize_object(bufs) C2 = d['C'] nt.assert_equal(C2.a, C.a) @@ -159,7 +159,7 @@ def test_class_oldstyle(): bufs = serialize_object(dict(C=C)) canned = pickle.loads(bufs[0]) nt.assert_is_instance(canned['C'], CannedClass) - d, r = unserialize_object(bufs) + d, r = deserialize_object(bufs) C2 = d['C'] nt.assert_equal(C2.a, C.a) @@ -168,7 +168,7 @@ def test_tuple(): bufs = serialize_object(tup) canned = pickle.loads(bufs[0]) nt.assert_is_instance(canned, tuple) - t2, r = unserialize_object(bufs) + t2, r = deserialize_object(bufs) nt.assert_equal(t2[0](t2[1]), tup[0](tup[1])) point = namedtuple('point', 'x y') @@ -178,7 +178,7 @@ def test_namedtuple(): bufs = serialize_object(p) canned = pickle.loads(bufs[0]) nt.assert_is_instance(canned, point) - p2, r = unserialize_object(bufs, globals()) + p2, r = deserialize_object(bufs, globals()) nt.assert_equal(p2.x, p.x) nt.assert_equal(p2.y, p.y) @@ -187,7 +187,7 @@ def test_list(): bufs = serialize_object(lis) canned = pickle.loads(bufs[0]) nt.assert_is_instance(canned, list) - l2, r = unserialize_object(bufs) + l2, r = deserialize_object(bufs) nt.assert_equal(l2[0](l2[1]), lis[0](lis[1])) def test_class_inheritance(): @@ -202,7 +202,7 @@ def test_class_inheritance(): bufs = serialize_object(dict(D=D)) canned = pickle.loads(bufs[0]) nt.assert_is_instance(canned['D'], CannedClass) - d, r = unserialize_object(bufs) + d, r = deserialize_object(bufs) D2 = d['D'] nt.assert_equal(D2.a, D.a) nt.assert_equal(D2.b, D.b) diff --git a/IPython/kernel/zmq/tests/test_session.py b/IPython/kernel/zmq/tests/test_session.py index 018705a..7df16da 100644 --- a/IPython/kernel/zmq/tests/test_session.py +++ b/IPython/kernel/zmq/tests/test_session.py @@ -60,7 +60,7 @@ class TestSession(SessionTestCase): msg = self.session.msg('execute', content=dict(a=10, b=1.1)) msg_list = self.session.serialize(msg, ident=b'foo') ident, msg_list = self.session.feed_identities(msg_list) - new_msg = self.session.unserialize(msg_list) + new_msg = self.session.deserialize(msg_list) self.assertEqual(ident[0], b'foo') self.assertEqual(new_msg['msg_id'],msg['msg_id']) self.assertEqual(new_msg['msg_type'],msg['msg_type']) @@ -82,7 +82,7 @@ class TestSession(SessionTestCase): self.session.send(A, msg, ident=b'foo', buffers=[b'bar']) ident, msg_list = self.session.feed_identities(B.recv_multipart()) - new_msg = self.session.unserialize(msg_list) + new_msg = self.session.deserialize(msg_list) self.assertEqual(ident[0], b'foo') self.assertEqual(new_msg['msg_id'],msg['msg_id']) self.assertEqual(new_msg['msg_type'],msg['msg_type']) @@ -100,7 +100,7 @@ class TestSession(SessionTestCase): self.session.send(A, None, content=content, parent=parent, header=header, metadata=metadata, ident=b'foo', buffers=[b'bar']) ident, msg_list = self.session.feed_identities(B.recv_multipart()) - new_msg = self.session.unserialize(msg_list) + new_msg = self.session.deserialize(msg_list) self.assertEqual(ident[0], b'foo') self.assertEqual(new_msg['msg_id'],msg['msg_id']) self.assertEqual(new_msg['msg_type'],msg['msg_type']) @@ -263,7 +263,7 @@ class TestSession(SessionTestCase): p = session.msg('msg') msg = session.msg('msg', content=content, metadata=metadata, parent=p['header']) smsg = session.serialize(msg) - msg2 = session.unserialize(session.feed_identities(smsg)[1]) + msg2 = session.deserialize(session.feed_identities(smsg)[1]) assert isinstance(msg2['header']['date'], datetime) self.assertEqual(msg['header'], msg2['header']) self.assertEqual(msg['parent_header'], msg2['parent_header']) @@ -305,7 +305,7 @@ class TestSession(SessionTestCase): self.session.send_raw(A, msg_list, ident=b'foo') ident, new_msg_list = self.session.feed_identities(B.recv_multipart()) - new_msg = self.session.unserialize(new_msg_list) + new_msg = self.session.deserialize(new_msg_list) self.assertEqual(ident[0], b'foo') self.assertEqual(new_msg['msg_type'],msg['msg_type']) self.assertEqual(new_msg['header'],msg['header']) diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 5fec363..44c6d1a 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -801,7 +801,7 @@ class Client(HasTraits): # construct result: if content['status'] == 'ok': - self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0] + self.results[msg_id] = serialize.deserialize_object(msg['buffers'])[0] elif content['status'] == 'aborted': self.results[msg_id] = error.TaskAborted(msg_id) elif content['status'] == 'resubmitted': @@ -903,7 +903,7 @@ class Client(HasTraits): elif msg_type == 'execute_result': md['execute_result'] = content elif msg_type == 'data_message': - data, remainder = serialize.unserialize_object(msg['buffers']) + data, remainder = serialize.deserialize_object(msg['buffers']) md['data'].update(data) elif msg_type == 'status': # idle message comes after all outputs @@ -1612,7 +1612,7 @@ class Client(HasTraits): if rcontent['status'] == 'ok': if header['msg_type'] == 'apply_reply': - res,buffers = serialize.unserialize_object(buffers) + res,buffers = serialize.deserialize_object(buffers) elif header['msg_type'] == 'execute_reply': res = ExecuteReply(msg_id, rcontent, md) else: diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index e2cc769..a20f4d9 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -523,7 +523,7 @@ class Hub(SessionFactory): return client_id = idents[0] try: - msg = self.session.unserialize(msg, content=True) + msg = self.session.deserialize(msg, content=True) except Exception: content = error.wrap_exception() self.log.error("Bad Query Message: %r", msg, exc_info=True) @@ -588,7 +588,7 @@ class Hub(SessionFactory): return queue_id, client_id = idents[:2] try: - msg = self.session.unserialize(msg) + msg = self.session.deserialize(msg) except Exception: self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True) return @@ -636,7 +636,7 @@ class Hub(SessionFactory): client_id, queue_id = idents[:2] try: - msg = self.session.unserialize(msg) + msg = self.session.deserialize(msg) except Exception: self.log.error("queue::engine %r sent invalid message to %r: %r", queue_id, client_id, msg, exc_info=True) @@ -690,7 +690,7 @@ class Hub(SessionFactory): client_id = idents[0] try: - msg = self.session.unserialize(msg) + msg = self.session.deserialize(msg) except Exception: self.log.error("task::client %r sent invalid task message: %r", client_id, msg, exc_info=True) @@ -740,7 +740,7 @@ class Hub(SessionFactory): """save the result of a completed task.""" client_id = idents[0] try: - msg = self.session.unserialize(msg) + msg = self.session.deserialize(msg) except Exception: self.log.error("task::invalid task result message send to %r: %r", client_id, msg, exc_info=True) @@ -794,7 +794,7 @@ class Hub(SessionFactory): def save_task_destination(self, idents, msg): try: - msg = self.session.unserialize(msg, content=True) + msg = self.session.deserialize(msg, content=True) except Exception: self.log.error("task::invalid task tracking message", exc_info=True) return @@ -831,7 +831,7 @@ class Hub(SessionFactory): """save an iopub message into the db""" # print (topics) try: - msg = self.session.unserialize(msg, content=True) + msg = self.session.deserialize(msg, content=True) except Exception: self.log.error("iopub::invalid IOPub message", exc_info=True) return diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index fbf07c4..4f13a1b 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -251,7 +251,7 @@ class TaskScheduler(SessionFactory): self.log.warn("task::Invalid Message: %r",msg) return try: - msg = self.session.unserialize(msg) + msg = self.session.deserialize(msg) except ValueError: self.log.warn("task::Unauthorized message from: %r"%idents) return @@ -270,7 +270,7 @@ class TaskScheduler(SessionFactory): self.log.warn("task::Invalid Message: %r",msg) return try: - msg = self.session.unserialize(msg) + msg = self.session.deserialize(msg) except ValueError: self.log.warn("task::Unauthorized message from: %r"%idents) return @@ -375,7 +375,7 @@ class TaskScheduler(SessionFactory): self.notifier_stream.flush() try: idents, msg = self.session.feed_identities(raw_msg, copy=False) - msg = self.session.unserialize(msg, content=False, copy=False) + msg = self.session.deserialize(msg, content=False, copy=False) except Exception: self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) return @@ -621,7 +621,7 @@ class TaskScheduler(SessionFactory): """dispatch method for result replies""" try: idents,msg = self.session.feed_identities(raw_msg, copy=False) - msg = self.session.unserialize(msg, content=False, copy=False) + msg = self.session.deserialize(msg, content=False, copy=False) engine = idents[0] try: idx = self.targets.index(engine) diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index a72b5b8..32146b4 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -159,7 +159,7 @@ class EngineFactory(RegistrationFactory): loop = self.loop identity = self.bident idents,msg = self.session.feed_identities(msg) - msg = self.session.unserialize(msg) + msg = self.session.deserialize(msg) content = msg['content'] info = self.connection_info diff --git a/setupbase.py b/setupbase.py index 5e85d4b..7a9da5e 100644 --- a/setupbase.py +++ b/setupbase.py @@ -161,7 +161,8 @@ def find_package_data(): pjoin(components, "requirejs", "require.js"), pjoin(components, "underscore", "underscore-min.js"), pjoin(components, "moment", "moment.js"), - pjoin(components, "moment", "min","moment.min.js"), + pjoin(components, "moment", "min", "moment.min.js"), + pjoin(components, "text-encoding", "lib", "encoding.js"), ]) # Ship all of Codemirror's CSS and JS