##// END OF EJS Templates
support buffers in comm messages...
MinRK -
Show More
@@ -4,6 +4,7 b''
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import json
6 import json
7 import struct
7
8
8 try:
9 try:
9 from urllib.parse import urlparse # Py 3
10 from urllib.parse import urlparse # Py 3
@@ -28,6 +29,61 b' from IPython.utils.py3compat import PY3, cast_unicode'
28 from .handlers import IPythonHandler
29 from .handlers import IPythonHandler
29
30
30
31
32 def serialize_binary_message(msg):
33 """serialize a message as a binary blob
34
35 Header:
36
37 4 bytes: number of msg parts (nbufs) as 32b int
38 4 * nbufs bytes: offset for each buffer as integer as 32b int
39
40 Offsets are from the start of the buffer, including the header.
41
42 Returns
43 -------
44
45 The message serialized to bytes.
46
47 """
48 buffers = msg.pop('buffers')
49 bmsg = json.dumps(msg, default=date_default).encode('utf8')
50 buffers.insert(0, bmsg)
51 nbufs = len(buffers)
52 sizes = (len(buf) for buf in buffers)
53 offsets = [4 * (nbufs + 1)]
54 for buf in buffers[:-1]:
55 offsets.append(offsets[-1] + len(buf))
56 offsets_buf = struct.pack('!' + 'i' * (nbufs + 1), nbufs, *offsets)
57 buffers.insert(0, offsets_buf)
58 return b''.join(buffers)
59
60
61 def unserialize_binary_message(bmsg):
62 """unserialize a message from a binary blog
63
64 Header:
65
66 4 bytes: number of msg parts (nbufs) as 32b int
67 4 * nbufs bytes: offset for each buffer as integer as 32b int
68
69 Offsets are from the start of the buffer, including the header.
70
71 Returns
72 -------
73
74 message dictionary
75 """
76 nbufs = struct.unpack('i', bmsg[:4])[0]
77 offsets = list(struct.unpack('!' + 'i' * nbufs, bmsg[4:4*(nbufs+1)]))
78 offsets.append(None)
79 bufs = []
80 for start, stop in zip(offsets[:-1], offsets[1:]):
81 bufs.append(bmsg[start:stop])
82 msg = json.loads(bufs[0])
83 msg['buffers'] = bufs[1:]
84 return msg
85
86
31 class ZMQStreamHandler(websocket.WebSocketHandler):
87 class ZMQStreamHandler(websocket.WebSocketHandler):
32
88
33 def check_origin(self, origin):
89 def check_origin(self, origin):
@@ -92,8 +148,12 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
92 msg['parent_header'].pop('date')
148 msg['parent_header'].pop('date')
93 except KeyError:
149 except KeyError:
94 pass
150 pass
95 msg.pop('buffers')
151 if msg['buffers']:
96 return json.dumps(msg, default=date_default)
152 buf = serialize_binary_message(msg)
153 return buf
154 else:
155 smsg = json.dumps(msg, default=date_default)
156 return cast_unicode(smsg)
97
157
98 def _on_zmq_reply(self, msg_list):
158 def _on_zmq_reply(self, msg_list):
99 # Sometimes this gets triggered when the on_close method is scheduled in the
159 # Sometimes this gets triggered when the on_close method is scheduled in the
@@ -104,7 +164,7 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
104 except Exception:
164 except Exception:
105 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
165 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
106 else:
166 else:
107 self.write_message(msg)
167 self.write_message(msg, binary=isinstance(msg, bytes))
108
168
109 def allow_draft76(self):
169 def allow_draft76(self):
110 """Allow draft 76, until browsers such as Safari update to RFC 6455.
170 """Allow draft 76, until browsers such as Safari update to RFC 6455.
@@ -553,7 +553,44 b' define(['
553 ], callback, errback
553 ], callback, errback
554 );
554 );
555 };
555 };
556
556
557 var decode_utf8 = function (array) {
558 // Decode UTF8 Uint8Array to String
559 // I can't believe Javascript makes us do this
560 // From http://stackoverflow.com/questions/17191945
561
562 var out, i, len, c;
563 var char2, char3;
564
565 out = "";
566 len = array.length;
567 i = 0;
568 while(i < len) {
569 c = array[i++];
570 switch(c >> 4) {
571 case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
572 // 0xxxxxxx
573 out += String.fromCharCode(c);
574 break;
575 case 12: case 13:
576 // 110x xxxx 10xx xxxx
577 char2 = array[i++];
578 out += String.fromCharCode(((c & 0x1F) << 6) | (char2 & 0x3F));
579 break;
580 case 14:
581 // 1110 xxxx 10xx xxxx 10xx xxxx
582 char2 = array[i++];
583 char3 = array[i++];
584 out += String.fromCharCode(((c & 0x0F) << 12) |
585 ((char2 & 0x3F) << 6) |
586 ((char3 & 0x3F) << 0));
587 break;
588 }
589 }
590
591 return out;
592 };
593
557 var utils = {
594 var utils = {
558 regex_split : regex_split,
595 regex_split : regex_split,
559 uuid : uuid,
596 uuid : uuid,
@@ -579,6 +616,7 b' define(['
579 ajax_error_msg : ajax_error_msg,
616 ajax_error_msg : ajax_error_msg,
580 log_ajax_error : log_ajax_error,
617 log_ajax_error : log_ajax_error,
581 requireCodeMirrorMode : requireCodeMirrorMode,
618 requireCodeMirrorMode : requireCodeMirrorMode,
619 decode_utf8: decode_utf8,
582 };
620 };
583
621
584 // Backwards compatability.
622 // Backwards compatability.
@@ -846,12 +846,60 b' define(['
846 }
846 }
847 };
847 };
848
848
849 Kernel.prototype._unserialize_binary_message = function(blob, callback) {
850 // unserialize the binary message format
851 // callback will be called with a message whose buffers attribute
852 // will be an array of DataViews.
853 var reader = new FileReader();
854 reader.onload = function(e) {
855 var data = new DataView(this.result);
856 // read the header: 1 + nbufs 32b integers
857 var nbufs = data.getInt32(0);
858 var offsets = [];
859 var i;
860 for (i = 1; i <= nbufs; i++) {
861 offsets.push(data.getInt32(i * 4));
862 }
863 // the first chunk is the message as utf-8 JSON
864 var msg = $.parseJSON(
865 utis.decode_utf8(
866 new Uint8Array(this.result.slice(offsets[0], offsets[1]))
867 )
868 );
869 // the remaining chunks are stored as DataViews in msg.buffers
870 msg.buffers = [];
871 var start, stop;
872 for (i = 1; i < nbufs; i++) {
873 start = offsets[i];
874 stop = offsets[i+1];
875 msg.buffers.push(new DataView(this.result.slice(start, stop)));
876 }
877 callback(msg);
878 };
879 reader.readAsArrayBuffer(blob);
880 };
881
882
883 Kernel.prototype._unserialize_msg = function (e, callback) {
884 // unserialze a message and pass the unpacked message object to callback
885 if (typeof e.data === "string") {
886 // text JSON message
887 callback($.parseJSON(e.data));
888 } else {
889 // binary message
890 this._unserialize_binary_message(e.data, callback);
891 }
892 };
893
849 /**
894 /**
850 * @function _handle_shell_reply
895 * @function _handle_shell_reply
851 */
896 */
852 Kernel.prototype._handle_shell_reply = function (e) {
897 Kernel.prototype._handle_shell_reply = function (e) {
853 var reply = $.parseJSON(e.data);
898 this._unserialize_msg(e, $.proxy(this._finish_shell_reply, this));
854 this.events.trigger('shell_reply.Kernel', {kernel: this, reply: reply});
899 };
900
901 Kernel.prototype._finish_shell_reply = function (reply) {
902 this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply});
855 var content = reply.content;
903 var content = reply.content;
856 var metadata = reply.metadata;
904 var metadata = reply.metadata;
857 var parent_id = reply.parent_header.msg_id;
905 var parent_id = reply.parent_header.msg_id;
@@ -978,8 +1026,11 b' define(['
978 * @function _handle_iopub_message
1026 * @function _handle_iopub_message
979 */
1027 */
980 Kernel.prototype._handle_iopub_message = function (e) {
1028 Kernel.prototype._handle_iopub_message = function (e) {
981 var msg = $.parseJSON(e.data);
1029 this._unserialize_msg(e, $.proxy(this._finish_iopub_message, this));
1030 };
982
1031
1032
1033 Kernel.prototype._finish_iopub_message = function (msg) {
983 var handler = this.get_iopub_handler(msg.header.msg_type);
1034 var handler = this.get_iopub_handler(msg.header.msg_type);
984 if (handler !== undefined) {
1035 if (handler !== undefined) {
985 handler(msg);
1036 handler(msg);
@@ -990,7 +1041,11 b' define(['
990 * @function _handle_input_request
1041 * @function _handle_input_request
991 */
1042 */
992 Kernel.prototype._handle_input_request = function (e) {
1043 Kernel.prototype._handle_input_request = function (e) {
993 var request = $.parseJSON(e.data);
1044 this._unserialize_msg(e, $.proxy(this._finish_input_request, this));
1045 };
1046
1047
1048 Kernel.prototype._finish_input_request = function (request) {
994 var header = request.header;
1049 var header = request.header;
995 var content = request.content;
1050 var content = request.content;
996 var metadata = request.metadata;
1051 var metadata = request.metadata;
@@ -57,7 +57,7 b' class Comm(LoggingConfigurable):'
57 # I am primary, open my peer.
57 # I am primary, open my peer.
58 self.open(data)
58 self.open(data)
59
59
60 def _publish_msg(self, msg_type, data=None, metadata=None, **keys):
60 def _publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
61 """Helper for sending a comm message on IOPub"""
61 """Helper for sending a comm message on IOPub"""
62 data = {} if data is None else data
62 data = {} if data is None else data
63 metadata = {} if metadata is None else metadata
63 metadata = {} if metadata is None else metadata
@@ -67,6 +67,7 b' class Comm(LoggingConfigurable):'
67 metadata=json_clean(metadata),
67 metadata=json_clean(metadata),
68 parent=self.kernel._parent_header,
68 parent=self.kernel._parent_header,
69 ident=self.topic,
69 ident=self.topic,
70 buffers=buffers,
70 )
71 )
71
72
72 def __del__(self):
73 def __del__(self):
@@ -75,7 +76,7 b' class Comm(LoggingConfigurable):'
75
76
76 # publishing messages
77 # publishing messages
77
78
78 def open(self, data=None, metadata=None):
79 def open(self, data=None, metadata=None, buffers=None):
79 """Open the frontend-side version of this comm"""
80 """Open the frontend-side version of this comm"""
80 if data is None:
81 if data is None:
81 data = self._open_data
82 data = self._open_data
@@ -86,22 +87,29 b' class Comm(LoggingConfigurable):'
86
87
87 comm_manager.register_comm(self)
88 comm_manager.register_comm(self)
88 self._closed = False
89 self._closed = False
89 self._publish_msg('comm_open', data, metadata, target_name=self.target_name)
90 self._publish_msg('comm_open',
91 data=data, metadata=metadata, buffers=buffers,
92 target_name=self.target_name,
93 )
90
94
91 def close(self, data=None, metadata=None):
95 def close(self, data=None, metadata=None, buffers=None):
92 """Close the frontend-side version of this comm"""
96 """Close the frontend-side version of this comm"""
93 if self._closed:
97 if self._closed:
94 # only close once
98 # only close once
95 return
99 return
96 if data is None:
100 if data is None:
97 data = self._close_data
101 data = self._close_data
98 self._publish_msg('comm_close', data, metadata)
102 self._publish_msg('comm_close',
103 data=data, metadata=metadata, buffers=buffers,
104 )
99 self.kernel.comm_manager.unregister_comm(self)
105 self.kernel.comm_manager.unregister_comm(self)
100 self._closed = True
106 self._closed = True
101
107
102 def send(self, data=None, metadata=None):
108 def send(self, data=None, metadata=None, buffers=None):
103 """Send a message to the frontend-side version of this comm"""
109 """Send a message to the frontend-side version of this comm"""
104 self._publish_msg('comm_msg', data, metadata)
110 self._publish_msg('comm_msg',
111 data=data, metadata=metadata, buffers=buffers,
112 )
105
113
106 # registering callbacks
114 # registering callbacks
107
115
General Comments 0
You need to be logged in to leave comments. Login now