serialize.py
181 lines
| 5.8 KiB
| text/x-python
|
PythonLexer
/ ipython_kernel / serialize.py
MinRK
|
r17044 | """serialization utilities for apply messages""" | ||
MinRK
|
r6788 | |||
MinRK
|
r17044 | # Copyright (c) IPython Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||||
MinRK
|
r6788 | |||
try: | ||||
import cPickle | ||||
pickle = cPickle | ||||
except: | ||||
cPickle = None | ||||
import pickle | ||||
Min RK
|
r21171 | from itertools import chain | ||
Min RK
|
r19204 | from IPython.utils.py3compat import PY3, buffer_to_bytes_py2 | ||
Min RK
|
r21070 | from ipython_kernel.pickleutil import ( | ||
MinRK
|
r9712 | can, uncan, can_sequence, uncan_sequence, CannedObject, | ||
MinRK
|
r17044 | istype, sequence_types, PICKLE_PROTOCOL, | ||
MinRK
|
r7969 | ) | ||
Min RK
|
r20955 | from jupyter_client.session import MAX_ITEMS, MAX_BYTES | ||
Min RK
|
r20954 | |||
MinRK
|
r6788 | |||
Min RK
|
r19204 | if PY3: | ||
MinRK
|
r6788 | buffer = memoryview | ||
#----------------------------------------------------------------------------- | ||||
# Serialization Functions | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r7967 | |||
MinRK
|
r8033 | def _extract_buffers(obj, threshold=MAX_BYTES): | ||
MinRK
|
r7967 | """extract buffers larger than a certain threshold""" | ||
buffers = [] | ||||
if isinstance(obj, CannedObject) and obj.buffers: | ||||
for i,buf in enumerate(obj.buffers): | ||||
if len(buf) > threshold: | ||||
# buffer larger than threshold, prevent pickling | ||||
obj.buffers[i] = None | ||||
buffers.append(buf) | ||||
elif isinstance(buf, buffer): | ||||
# buffer too small for separate send, coerce to bytes | ||||
# because pickling buffer objects just results in broken pointers | ||||
obj.buffers[i] = bytes(buf) | ||||
return buffers | ||||
def _restore_buffers(obj, buffers): | ||||
"""restore buffers extracted by """ | ||||
if isinstance(obj, CannedObject) and obj.buffers: | ||||
for i,buf in enumerate(obj.buffers): | ||||
if buf is None: | ||||
obj.buffers[i] = buffers.pop(0) | ||||
MinRK
|
r8033 | def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): | ||
MinRK
|
r6788 | """Serialize an object into a list of sendable buffers. | ||
Min RK
|
r20955 | |||
MinRK
|
r6788 | Parameters | ||
---------- | ||||
Min RK
|
r20955 | |||
MinRK
|
r6788 | obj : object | ||
The object to be serialized | ||||
MinRK
|
r8033 | buffer_threshold : int | ||
MinRK
|
r7967 | The threshold (in bytes) for pulling out data buffers | ||
to avoid pickling them. | ||||
MinRK
|
r8033 | item_threshold : int | ||
The maximum number of items over which canning will iterate. | ||||
Containers (lists, dicts) larger than this will be pickled without | ||||
introspection. | ||||
Min RK
|
r20955 | |||
MinRK
|
r6788 | Returns | ||
------- | ||||
MinRK
|
r7967 | [bufs] : list of buffers representing the serialized object. | ||
MinRK
|
r6788 | """ | ||
MinRK
|
r7967 | buffers = [] | ||
MinRK
|
r9712 | if istype(obj, sequence_types) and len(obj) < item_threshold: | ||
MinRK
|
r7967 | cobj = can_sequence(obj) | ||
for c in cobj: | ||||
MinRK
|
r8033 | buffers.extend(_extract_buffers(c, buffer_threshold)) | ||
MinRK
|
r9712 | elif istype(obj, dict) and len(obj) < item_threshold: | ||
MinRK
|
r7967 | cobj = {} | ||
Thomas Kluyver
|
r13360 | for k in sorted(obj): | ||
MinRK
|
r7967 | c = can(obj[k]) | ||
MinRK
|
r8033 | buffers.extend(_extract_buffers(c, buffer_threshold)) | ||
MinRK
|
r7967 | cobj[k] = c | ||
MinRK
|
r6788 | else: | ||
MinRK
|
r7967 | cobj = can(obj) | ||
MinRK
|
r8033 | buffers.extend(_extract_buffers(cobj, buffer_threshold)) | ||
MinRK
|
r7967 | |||
MinRK
|
r17044 | buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL)) | ||
MinRK
|
r7967 | return buffers | ||
MinRK
|
r18330 | def deserialize_object(buffers, g=None): | ||
MinRK
|
r7967 | """reconstruct an object serialized by serialize_object from data buffers. | ||
Min RK
|
r20955 | |||
MinRK
|
r7967 | Parameters | ||
---------- | ||||
Min RK
|
r20955 | |||
MinRK
|
r7967 | bufs : list of buffers/bytes | ||
Min RK
|
r20955 | |||
MinRK
|
r7967 | g : globals to be used when uncanning | ||
Min RK
|
r20955 | |||
MinRK
|
r7967 | Returns | ||
------- | ||||
Min RK
|
r20955 | |||
MinRK
|
r7967 | (newobj, bufs) : unpacked object, and the list of remaining unused buffers. | ||
""" | ||||
bufs = list(buffers) | ||||
Min RK
|
r19204 | pobj = buffer_to_bytes_py2(bufs.pop(0)) | ||
MinRK
|
r8133 | canned = pickle.loads(pobj) | ||
MinRK
|
r9712 | if istype(canned, sequence_types) and len(canned) < MAX_ITEMS: | ||
MinRK
|
r7967 | for c in canned: | ||
_restore_buffers(c, bufs) | ||||
newobj = uncan_sequence(canned, g) | ||||
Stephan Rave
|
r12199 | elif istype(canned, dict) and len(canned) < MAX_ITEMS: | ||
MinRK
|
r6788 | newobj = {} | ||
Thomas Kluyver
|
r13360 | for k in sorted(canned): | ||
MinRK
|
r7967 | c = canned[k] | ||
_restore_buffers(c, bufs) | ||||
newobj[k] = uncan(c, g) | ||||
MinRK
|
r6788 | else: | ||
MinRK
|
r7967 | _restore_buffers(canned, bufs) | ||
newobj = uncan(canned, g) | ||||
Min RK
|
r20955 | |||
MinRK
|
r7967 | return newobj, bufs | ||
MinRK
|
r6788 | |||
MinRK
|
r8033 | def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): | ||
MinRK
|
r6788 | """pack up a function, args, and kwargs to be sent over the wire | ||
Min RK
|
r20955 | |||
MinRK
|
r8133 | Each element of args/kwargs will be canned for special treatment, | ||
but inspection will not go any deeper than that. | ||||
Min RK
|
r20955 | |||
MinRK
|
r8133 | Any object whose data is larger than `threshold` will not have their data copied | ||
(only numpy arrays and bytes/buffers support zero-copy) | ||||
Min RK
|
r20955 | |||
MinRK
|
r8133 | Message will be a list of bytes/buffers of the format: | ||
Min RK
|
r20955 | |||
MinRK
|
r8133 | [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ] | ||
Min RK
|
r20955 | |||
MinRK
|
r8133 | With length at least two + len(args) + len(kwargs) | ||
MinRK
|
r7967 | """ | ||
Min RK
|
r20955 | |||
Min RK
|
r21171 | arg_bufs = list(chain.from_iterable( | ||
serialize_object(arg, buffer_threshold, item_threshold) for arg in args)) | ||||
Min RK
|
r20955 | |||
MinRK
|
r8133 | kw_keys = sorted(kwargs.keys()) | ||
Min RK
|
r21171 | kwarg_bufs = list(chain.from_iterable( | ||
serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)) | ||||
Min RK
|
r20955 | |||
MinRK
|
r8133 | info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys) | ||
Min RK
|
r20955 | |||
MinRK
|
r17044 | msg = [pickle.dumps(can(f), PICKLE_PROTOCOL)] | ||
msg.append(pickle.dumps(info, PICKLE_PROTOCOL)) | ||||
MinRK
|
r8133 | msg.extend(arg_bufs) | ||
msg.extend(kwarg_bufs) | ||||
Min RK
|
r20955 | |||
MinRK
|
r6788 | return msg | ||
def unpack_apply_message(bufs, g=None, copy=True): | ||||
"""unpack f,args,kwargs from buffers packed by pack_apply_message() | ||||
Returns: original f,args,kwargs""" | ||||
bufs = list(bufs) # allow us to pop | ||||
MinRK
|
r8133 | assert len(bufs) >= 2, "not enough buffers!" | ||
Min RK
|
r19204 | pf = buffer_to_bytes_py2(bufs.pop(0)) | ||
f = uncan(pickle.loads(pf), g) | ||||
pinfo = buffer_to_bytes_py2(bufs.pop(0)) | ||||
info = pickle.loads(pinfo) | ||||
MinRK
|
r8133 | arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:] | ||
Min RK
|
r20955 | |||
MinRK
|
r8133 | args = [] | ||
for i in range(info['nargs']): | ||||
MinRK
|
r18330 | arg, arg_bufs = deserialize_object(arg_bufs, g) | ||
MinRK
|
r8133 | args.append(arg) | ||
args = tuple(args) | ||||
assert not arg_bufs, "Shouldn't be any arg bufs left over" | ||||
Min RK
|
r20955 | |||
MinRK
|
r8133 | kwargs = {} | ||
for key in info['kw_keys']: | ||||
MinRK
|
r18330 | kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g) | ||
MinRK
|
r8133 | kwargs[key] = kwarg | ||
assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over" | ||||
MinRK
|
r6788 | |||
Min RK
|
r20955 | return f,args,kwargs | ||