diff --git a/IPython/zmq/serialize.py b/IPython/zmq/serialize.py index a758f86..cb284fe 100644 --- a/IPython/zmq/serialize.py +++ b/IPython/zmq/serialize.py @@ -32,6 +32,7 @@ except: # IPython imports from IPython.utils import py3compat +from IPython.utils.data import flatten from IPython.utils.pickleutil import ( can, uncan, can_sequence, uncan_sequence, CannedObject ) @@ -123,7 +124,11 @@ def unserialize_object(buffers, g=None): (newobj, bufs) : unpacked object, and the list of remaining unused buffers. """ bufs = list(buffers) - canned = pickle.loads(bufs.pop(0)) + pobj = bufs.pop(0) + if not isinstance(pobj, bytes): + # a zmq message + pobj = bytes(pobj) + canned = pickle.loads(pobj) if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS: for c in canned: _restore_buffers(c, bufs) @@ -143,38 +148,57 @@ def unserialize_object(buffers, g=None): def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): """pack up a function, args, and kwargs to be sent over the wire - as a series of buffers. Any object whose data is larger than `threshold` - will not have their data copied (currently only numpy arrays support zero-copy) + Each element of args/kwargs will be canned for special treatment, + but inspection will not go any deeper than that. + + Any object whose data is larger than `threshold` will not have their data copied + (only numpy arrays and bytes/buffers support zero-copy) + + Message will be a list of bytes/buffers of the format: + + [ cf, pinfo, , ] + + With length at least two + len(args) + len(kwargs) """ + + arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args) + + kw_keys = sorted(kwargs.keys()) + kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys) + + info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys) + msg = [pickle.dumps(can(f),-1)] - databuffers = [] # for large objects - sargs = serialize_object(args, buffer_threshold, item_threshold) - msg.append(sargs[0]) - databuffers.extend(sargs[1:]) - skwargs = serialize_object(kwargs, buffer_threshold, item_threshold) - msg.append(skwargs[0]) - databuffers.extend(skwargs[1:]) - msg.extend(databuffers) + msg.append(pickle.dumps(info, -1)) + msg.extend(arg_bufs) + msg.extend(kwarg_bufs) + return msg def unpack_apply_message(bufs, g=None, copy=True): """unpack f,args,kwargs from buffers packed by pack_apply_message() Returns: original f,args,kwargs""" bufs = list(bufs) # allow us to pop - assert len(bufs) >= 3, "not enough buffers!" + assert len(bufs) >= 2, "not enough buffers!" if not copy: - for i in range(3): + for i in range(2): bufs[i] = bufs[i].bytes f = uncan(pickle.loads(bufs.pop(0)), g) - # sargs = bufs.pop(0) - # pop kwargs out, so first n-elements are args, serialized - skwargs = bufs.pop(1) - args, bufs = unserialize_object(bufs, g) - # put skwargs back in as the first element - bufs.insert(0, skwargs) - kwargs, bufs = unserialize_object(bufs, g) - - assert not bufs, "Shouldn't be any data left over" + info = pickle.loads(bufs.pop(0)) + arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:] + + args = [] + for i in range(info['nargs']): + arg, arg_bufs = unserialize_object(arg_bufs, g) + args.append(arg) + args = tuple(args) + assert not arg_bufs, "Shouldn't be any arg bufs left over" + + kwargs = {} + for key in info['kw_keys']: + kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g) + kwargs[key] = kwarg + assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over" return f,args,kwargs