##// END OF EJS Templates
serialize elements of args/kwargs in pack_apply message...
MinRK -
Show More
@@ -32,6 +32,7 b' except:'
32
32
33 # IPython imports
33 # IPython imports
34 from IPython.utils import py3compat
34 from IPython.utils import py3compat
35 from IPython.utils.data import flatten
35 from IPython.utils.pickleutil import (
36 from IPython.utils.pickleutil import (
36 can, uncan, can_sequence, uncan_sequence, CannedObject
37 can, uncan, can_sequence, uncan_sequence, CannedObject
37 )
38 )
@@ -123,7 +124,11 b' def unserialize_object(buffers, g=None):'
123 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
124 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
124 """
125 """
125 bufs = list(buffers)
126 bufs = list(buffers)
126 canned = pickle.loads(bufs.pop(0))
127 pobj = bufs.pop(0)
128 if not isinstance(pobj, bytes):
129 # a zmq message
130 pobj = bytes(pobj)
131 canned = pickle.loads(pobj)
127 if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS:
132 if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS:
128 for c in canned:
133 for c in canned:
129 _restore_buffers(c, bufs)
134 _restore_buffers(c, bufs)
@@ -143,38 +148,57 b' def unserialize_object(buffers, g=None):'
143 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
148 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
144 """pack up a function, args, and kwargs to be sent over the wire
149 """pack up a function, args, and kwargs to be sent over the wire
145
150
146 as a series of buffers. Any object whose data is larger than `threshold`
151 Each element of args/kwargs will be canned for special treatment,
147 will not have their data copied (currently only numpy arrays support zero-copy)
152 but inspection will not go any deeper than that.
153
154 Any object whose data is larger than `threshold` will not have their data copied
155 (only numpy arrays and bytes/buffers support zero-copy)
156
157 Message will be a list of bytes/buffers of the format:
158
159 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
160
161 With length at least two + len(args) + len(kwargs)
148 """
162 """
163
164 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
165
166 kw_keys = sorted(kwargs.keys())
167 kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
168
169 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
170
149 msg = [pickle.dumps(can(f),-1)]
171 msg = [pickle.dumps(can(f),-1)]
150 databuffers = [] # for large objects
172 msg.append(pickle.dumps(info, -1))
151 sargs = serialize_object(args, buffer_threshold, item_threshold)
173 msg.extend(arg_bufs)
152 msg.append(sargs[0])
174 msg.extend(kwarg_bufs)
153 databuffers.extend(sargs[1:])
175
154 skwargs = serialize_object(kwargs, buffer_threshold, item_threshold)
155 msg.append(skwargs[0])
156 databuffers.extend(skwargs[1:])
157 msg.extend(databuffers)
158 return msg
176 return msg
159
177
160 def unpack_apply_message(bufs, g=None, copy=True):
178 def unpack_apply_message(bufs, g=None, copy=True):
161 """unpack f,args,kwargs from buffers packed by pack_apply_message()
179 """unpack f,args,kwargs from buffers packed by pack_apply_message()
162 Returns: original f,args,kwargs"""
180 Returns: original f,args,kwargs"""
163 bufs = list(bufs) # allow us to pop
181 bufs = list(bufs) # allow us to pop
164 assert len(bufs) >= 3, "not enough buffers!"
182 assert len(bufs) >= 2, "not enough buffers!"
165 if not copy:
183 if not copy:
166 for i in range(3):
184 for i in range(2):
167 bufs[i] = bufs[i].bytes
185 bufs[i] = bufs[i].bytes
168 f = uncan(pickle.loads(bufs.pop(0)), g)
186 f = uncan(pickle.loads(bufs.pop(0)), g)
169 # sargs = bufs.pop(0)
187 info = pickle.loads(bufs.pop(0))
170 # pop kwargs out, so first n-elements are args, serialized
188 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
171 skwargs = bufs.pop(1)
189
172 args, bufs = unserialize_object(bufs, g)
190 args = []
173 # put skwargs back in as the first element
191 for i in range(info['nargs']):
174 bufs.insert(0, skwargs)
192 arg, arg_bufs = unserialize_object(arg_bufs, g)
175 kwargs, bufs = unserialize_object(bufs, g)
193 args.append(arg)
176
194 args = tuple(args)
177 assert not bufs, "Shouldn't be any data left over"
195 assert not arg_bufs, "Shouldn't be any arg bufs left over"
196
197 kwargs = {}
198 for key in info['kw_keys']:
199 kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
200 kwargs[key] = kwarg
201 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
178
202
179 return f,args,kwargs
203 return f,args,kwargs
180
204
General Comments 0
You need to be logged in to leave comments. Login now