serialize.py
198 lines
| 6.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r6788 | """serialization utilities for apply messages | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2010-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
try: | ||||
import cPickle | ||||
pickle = cPickle | ||||
except: | ||||
cPickle = None | ||||
import pickle | ||||
# IPython imports | ||||
from IPython.utils import py3compat | ||||
MinRK
|
r8133 | from IPython.utils.data import flatten | ||
MinRK
|
r7967 | from IPython.utils.pickleutil import ( | ||
MinRK
|
r9712 | can, uncan, can_sequence, uncan_sequence, CannedObject, | ||
istype, sequence_types, | ||||
MinRK
|
r7969 | ) | ||
MinRK
|
r6788 | |||
if py3compat.PY3: | ||||
buffer = memoryview | ||||
#----------------------------------------------------------------------------- | ||||
# Serialization Functions | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r8033 | # default values for the thresholds: | ||
MinRK
|
r7967 | MAX_ITEMS = 64 | ||
MinRK
|
r8033 | MAX_BYTES = 1024 | ||
MinRK
|
r7967 | |||
MinRK
|
r8033 | def _extract_buffers(obj, threshold=MAX_BYTES): | ||
MinRK
|
r7967 | """extract buffers larger than a certain threshold""" | ||
buffers = [] | ||||
if isinstance(obj, CannedObject) and obj.buffers: | ||||
for i,buf in enumerate(obj.buffers): | ||||
if len(buf) > threshold: | ||||
# buffer larger than threshold, prevent pickling | ||||
obj.buffers[i] = None | ||||
buffers.append(buf) | ||||
elif isinstance(buf, buffer): | ||||
# buffer too small for separate send, coerce to bytes | ||||
# because pickling buffer objects just results in broken pointers | ||||
obj.buffers[i] = bytes(buf) | ||||
return buffers | ||||
def _restore_buffers(obj, buffers): | ||||
"""restore buffers extracted by """ | ||||
if isinstance(obj, CannedObject) and obj.buffers: | ||||
for i,buf in enumerate(obj.buffers): | ||||
if buf is None: | ||||
obj.buffers[i] = buffers.pop(0) | ||||
MinRK
|
r8033 | def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): | ||
MinRK
|
r6788 | """Serialize an object into a list of sendable buffers. | ||
Parameters | ||||
---------- | ||||
obj : object | ||||
The object to be serialized | ||||
MinRK
|
r8033 | buffer_threshold : int | ||
MinRK
|
r7967 | The threshold (in bytes) for pulling out data buffers | ||
to avoid pickling them. | ||||
MinRK
|
r8033 | item_threshold : int | ||
The maximum number of items over which canning will iterate. | ||||
Containers (lists, dicts) larger than this will be pickled without | ||||
introspection. | ||||
MinRK
|
r6788 | |||
Returns | ||||
------- | ||||
MinRK
|
r7967 | [bufs] : list of buffers representing the serialized object. | ||
MinRK
|
r6788 | """ | ||
MinRK
|
r7967 | buffers = [] | ||
MinRK
|
r9712 | if istype(obj, sequence_types) and len(obj) < item_threshold: | ||
MinRK
|
r7967 | cobj = can_sequence(obj) | ||
for c in cobj: | ||||
MinRK
|
r8033 | buffers.extend(_extract_buffers(c, buffer_threshold)) | ||
MinRK
|
r9712 | elif istype(obj, dict) and len(obj) < item_threshold: | ||
MinRK
|
r7967 | cobj = {} | ||
MinRK
|
r6788 | for k in sorted(obj.iterkeys()): | ||
MinRK
|
r7967 | c = can(obj[k]) | ||
MinRK
|
r8033 | buffers.extend(_extract_buffers(c, buffer_threshold)) | ||
MinRK
|
r7967 | cobj[k] = c | ||
MinRK
|
r6788 | else: | ||
MinRK
|
r7967 | cobj = can(obj) | ||
MinRK
|
r8033 | buffers.extend(_extract_buffers(cobj, buffer_threshold)) | ||
MinRK
|
r7967 | |||
buffers.insert(0, pickle.dumps(cobj,-1)) | ||||
return buffers | ||||
def unserialize_object(buffers, g=None): | ||||
"""reconstruct an object serialized by serialize_object from data buffers. | ||||
Parameters | ||||
---------- | ||||
bufs : list of buffers/bytes | ||||
g : globals to be used when uncanning | ||||
Returns | ||||
------- | ||||
(newobj, bufs) : unpacked object, and the list of remaining unused buffers. | ||||
""" | ||||
bufs = list(buffers) | ||||
MinRK
|
r8133 | pobj = bufs.pop(0) | ||
if not isinstance(pobj, bytes): | ||||
# a zmq message | ||||
pobj = bytes(pobj) | ||||
canned = pickle.loads(pobj) | ||||
MinRK
|
r9712 | if istype(canned, sequence_types) and len(canned) < MAX_ITEMS: | ||
MinRK
|
r7967 | for c in canned: | ||
_restore_buffers(c, bufs) | ||||
newobj = uncan_sequence(canned, g) | ||||
MinRK
|
r12212 | elif istype(canned, dict) and len(canned) < MAX_ITEMS: | ||
MinRK
|
r6788 | newobj = {} | ||
MinRK
|
r7967 | for k in sorted(canned.iterkeys()): | ||
c = canned[k] | ||||
_restore_buffers(c, bufs) | ||||
newobj[k] = uncan(c, g) | ||||
MinRK
|
r6788 | else: | ||
MinRK
|
r7967 | _restore_buffers(canned, bufs) | ||
newobj = uncan(canned, g) | ||||
return newobj, bufs | ||||
MinRK
|
r6788 | |||
MinRK
|
r8033 | def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): | ||
MinRK
|
r6788 | """pack up a function, args, and kwargs to be sent over the wire | ||
MinRK
|
r8033 | |||
MinRK
|
r8133 | Each element of args/kwargs will be canned for special treatment, | ||
but inspection will not go any deeper than that. | ||||
Any object whose data is larger than `threshold` will not have their data copied | ||||
(only numpy arrays and bytes/buffers support zero-copy) | ||||
Message will be a list of bytes/buffers of the format: | ||||
[ cf, pinfo, <arg_bufs>, <kwarg_bufs> ] | ||||
With length at least two + len(args) + len(kwargs) | ||||
MinRK
|
r7967 | """ | ||
MinRK
|
r8133 | |||
arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args) | ||||
kw_keys = sorted(kwargs.keys()) | ||||
kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys) | ||||
info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys) | ||||
MinRK
|
r6788 | msg = [pickle.dumps(can(f),-1)] | ||
MinRK
|
r8133 | msg.append(pickle.dumps(info, -1)) | ||
msg.extend(arg_bufs) | ||||
msg.extend(kwarg_bufs) | ||||
MinRK
|
r6788 | return msg | ||
def unpack_apply_message(bufs, g=None, copy=True): | ||||
"""unpack f,args,kwargs from buffers packed by pack_apply_message() | ||||
Returns: original f,args,kwargs""" | ||||
bufs = list(bufs) # allow us to pop | ||||
MinRK
|
r8133 | assert len(bufs) >= 2, "not enough buffers!" | ||
MinRK
|
r6788 | if not copy: | ||
MinRK
|
r8133 | for i in range(2): | ||
MinRK
|
r6788 | bufs[i] = bufs[i].bytes | ||
MinRK
|
r7967 | f = uncan(pickle.loads(bufs.pop(0)), g) | ||
MinRK
|
r8133 | info = pickle.loads(bufs.pop(0)) | ||
arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:] | ||||
args = [] | ||||
for i in range(info['nargs']): | ||||
arg, arg_bufs = unserialize_object(arg_bufs, g) | ||||
args.append(arg) | ||||
args = tuple(args) | ||||
assert not arg_bufs, "Shouldn't be any arg bufs left over" | ||||
kwargs = {} | ||||
for key in info['kw_keys']: | ||||
kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g) | ||||
kwargs[key] = kwarg | ||||
assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over" | ||||
MinRK
|
r6788 | |||
return f,args,kwargs | ||||