Show More
@@ -32,6 +32,7 b' except:' | |||||
32 |
|
32 | |||
33 | # IPython imports |
|
33 | # IPython imports | |
34 | from IPython.utils import py3compat |
|
34 | from IPython.utils import py3compat | |
|
35 | from IPython.utils.data import flatten | |||
35 | from IPython.utils.pickleutil import ( |
|
36 | from IPython.utils.pickleutil import ( | |
36 | can, uncan, can_sequence, uncan_sequence, CannedObject |
|
37 | can, uncan, can_sequence, uncan_sequence, CannedObject | |
37 | ) |
|
38 | ) | |
@@ -123,7 +124,11 b' def unserialize_object(buffers, g=None):' | |||||
123 | (newobj, bufs) : unpacked object, and the list of remaining unused buffers. |
|
124 | (newobj, bufs) : unpacked object, and the list of remaining unused buffers. | |
124 | """ |
|
125 | """ | |
125 | bufs = list(buffers) |
|
126 | bufs = list(buffers) | |
126 |
|
|
127 | pobj = bufs.pop(0) | |
|
128 | if not isinstance(pobj, bytes): | |||
|
129 | # a zmq message | |||
|
130 | pobj = bytes(pobj) | |||
|
131 | canned = pickle.loads(pobj) | |||
127 | if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS: |
|
132 | if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS: | |
128 | for c in canned: |
|
133 | for c in canned: | |
129 | _restore_buffers(c, bufs) |
|
134 | _restore_buffers(c, bufs) | |
@@ -143,38 +148,57 b' def unserialize_object(buffers, g=None):' | |||||
143 | def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): |
|
148 | def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): | |
144 | """pack up a function, args, and kwargs to be sent over the wire |
|
149 | """pack up a function, args, and kwargs to be sent over the wire | |
145 |
|
150 | |||
146 | as a series of buffers. Any object whose data is larger than `threshold` |
|
151 | Each element of args/kwargs will be canned for special treatment, | |
147 | will not have their data copied (currently only numpy arrays support zero-copy) |
|
152 | but inspection will not go any deeper than that. | |
|
153 | ||||
|
154 | Any object whose data is larger than `threshold` will not have their data copied | |||
|
155 | (only numpy arrays and bytes/buffers support zero-copy) | |||
|
156 | ||||
|
157 | Message will be a list of bytes/buffers of the format: | |||
|
158 | ||||
|
159 | [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ] | |||
|
160 | ||||
|
161 | With length at least two + len(args) + len(kwargs) | |||
148 | """ |
|
162 | """ | |
|
163 | ||||
|
164 | arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args) | |||
|
165 | ||||
|
166 | kw_keys = sorted(kwargs.keys()) | |||
|
167 | kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys) | |||
|
168 | ||||
|
169 | info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys) | |||
|
170 | ||||
149 | msg = [pickle.dumps(can(f),-1)] |
|
171 | msg = [pickle.dumps(can(f),-1)] | |
150 | databuffers = [] # for large objects |
|
172 | msg.append(pickle.dumps(info, -1)) | |
151 | sargs = serialize_object(args, buffer_threshold, item_threshold) |
|
173 | msg.extend(arg_bufs) | |
152 |
msg. |
|
174 | msg.extend(kwarg_bufs) | |
153 | databuffers.extend(sargs[1:]) |
|
175 | ||
154 | skwargs = serialize_object(kwargs, buffer_threshold, item_threshold) |
|
|||
155 | msg.append(skwargs[0]) |
|
|||
156 | databuffers.extend(skwargs[1:]) |
|
|||
157 | msg.extend(databuffers) |
|
|||
158 | return msg |
|
176 | return msg | |
159 |
|
177 | |||
160 | def unpack_apply_message(bufs, g=None, copy=True): |
|
178 | def unpack_apply_message(bufs, g=None, copy=True): | |
161 | """unpack f,args,kwargs from buffers packed by pack_apply_message() |
|
179 | """unpack f,args,kwargs from buffers packed by pack_apply_message() | |
162 | Returns: original f,args,kwargs""" |
|
180 | Returns: original f,args,kwargs""" | |
163 | bufs = list(bufs) # allow us to pop |
|
181 | bufs = list(bufs) # allow us to pop | |
164 |
assert len(bufs) >= |
|
182 | assert len(bufs) >= 2, "not enough buffers!" | |
165 | if not copy: |
|
183 | if not copy: | |
166 |
for i in range( |
|
184 | for i in range(2): | |
167 | bufs[i] = bufs[i].bytes |
|
185 | bufs[i] = bufs[i].bytes | |
168 | f = uncan(pickle.loads(bufs.pop(0)), g) |
|
186 | f = uncan(pickle.loads(bufs.pop(0)), g) | |
169 | # sargs = bufs.pop(0) |
|
187 | info = pickle.loads(bufs.pop(0)) | |
170 | # pop kwargs out, so first n-elements are args, serialized |
|
188 | arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:] | |
171 | skwargs = bufs.pop(1) |
|
189 | ||
172 | args, bufs = unserialize_object(bufs, g) |
|
190 | args = [] | |
173 | # put skwargs back in as the first element |
|
191 | for i in range(info['nargs']): | |
174 | bufs.insert(0, skwargs) |
|
192 | arg, arg_bufs = unserialize_object(arg_bufs, g) | |
175 | kwargs, bufs = unserialize_object(bufs, g) |
|
193 | args.append(arg) | |
176 |
|
194 | args = tuple(args) | ||
177 |
assert not bufs, "Shouldn't be any |
|
195 | assert not arg_bufs, "Shouldn't be any arg bufs left over" | |
|
196 | ||||
|
197 | kwargs = {} | |||
|
198 | for key in info['kw_keys']: | |||
|
199 | kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g) | |||
|
200 | kwargs[key] = kwarg | |||
|
201 | assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over" | |||
178 |
|
202 | |||
179 | return f,args,kwargs |
|
203 | return f,args,kwargs | |
180 |
|
204 |
General Comments 0
You need to be logged in to leave comments.
Login now