##// END OF EJS Templates
add locks to update everywhere by using options to pass this...
add locks to update everywhere by using options to pass this (and check for this)

File last commit:

r13360:f4093495
r14570:4e85339b
Show More
serialize.py
198 lines | 6.3 KiB | text/x-python | PythonLexer
"""serialization utilities for apply messages
Authors:
* Min RK
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
try:
import cPickle
pickle = cPickle
except:
cPickle = None
import pickle
# IPython imports
from IPython.utils import py3compat
from IPython.utils.data import flatten
from IPython.utils.pickleutil import (
can, uncan, can_sequence, uncan_sequence, CannedObject,
istype, sequence_types,
)
if py3compat.PY3:
buffer = memoryview
#-----------------------------------------------------------------------------
# Serialization Functions
#-----------------------------------------------------------------------------
# default values for the thresholds:
MAX_ITEMS = 64
MAX_BYTES = 1024
def _extract_buffers(obj, threshold=MAX_BYTES):
"""extract buffers larger than a certain threshold"""
buffers = []
if isinstance(obj, CannedObject) and obj.buffers:
for i,buf in enumerate(obj.buffers):
if len(buf) > threshold:
# buffer larger than threshold, prevent pickling
obj.buffers[i] = None
buffers.append(buf)
elif isinstance(buf, buffer):
# buffer too small for separate send, coerce to bytes
# because pickling buffer objects just results in broken pointers
obj.buffers[i] = bytes(buf)
return buffers
def _restore_buffers(obj, buffers):
"""restore buffers extracted by """
if isinstance(obj, CannedObject) and obj.buffers:
for i,buf in enumerate(obj.buffers):
if buf is None:
obj.buffers[i] = buffers.pop(0)
def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
"""Serialize an object into a list of sendable buffers.
Parameters
----------
obj : object
The object to be serialized
buffer_threshold : int
The threshold (in bytes) for pulling out data buffers
to avoid pickling them.
item_threshold : int
The maximum number of items over which canning will iterate.
Containers (lists, dicts) larger than this will be pickled without
introspection.
Returns
-------
[bufs] : list of buffers representing the serialized object.
"""
buffers = []
if istype(obj, sequence_types) and len(obj) < item_threshold:
cobj = can_sequence(obj)
for c in cobj:
buffers.extend(_extract_buffers(c, buffer_threshold))
elif istype(obj, dict) and len(obj) < item_threshold:
cobj = {}
for k in sorted(obj):
c = can(obj[k])
buffers.extend(_extract_buffers(c, buffer_threshold))
cobj[k] = c
else:
cobj = can(obj)
buffers.extend(_extract_buffers(cobj, buffer_threshold))
buffers.insert(0, pickle.dumps(cobj,-1))
return buffers
def unserialize_object(buffers, g=None):
"""reconstruct an object serialized by serialize_object from data buffers.
Parameters
----------
bufs : list of buffers/bytes
g : globals to be used when uncanning
Returns
-------
(newobj, bufs) : unpacked object, and the list of remaining unused buffers.
"""
bufs = list(buffers)
pobj = bufs.pop(0)
if not isinstance(pobj, bytes):
# a zmq message
pobj = bytes(pobj)
canned = pickle.loads(pobj)
if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
for c in canned:
_restore_buffers(c, bufs)
newobj = uncan_sequence(canned, g)
elif istype(canned, dict) and len(canned) < MAX_ITEMS:
newobj = {}
for k in sorted(canned):
c = canned[k]
_restore_buffers(c, bufs)
newobj[k] = uncan(c, g)
else:
_restore_buffers(canned, bufs)
newobj = uncan(canned, g)
return newobj, bufs
def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
"""pack up a function, args, and kwargs to be sent over the wire
Each element of args/kwargs will be canned for special treatment,
but inspection will not go any deeper than that.
Any object whose data is larger than `threshold` will not have their data copied
(only numpy arrays and bytes/buffers support zero-copy)
Message will be a list of bytes/buffers of the format:
[ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
With length at least two + len(args) + len(kwargs)
"""
arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
kw_keys = sorted(kwargs.keys())
kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
msg = [pickle.dumps(can(f),-1)]
msg.append(pickle.dumps(info, -1))
msg.extend(arg_bufs)
msg.extend(kwarg_bufs)
return msg
def unpack_apply_message(bufs, g=None, copy=True):
"""unpack f,args,kwargs from buffers packed by pack_apply_message()
Returns: original f,args,kwargs"""
bufs = list(bufs) # allow us to pop
assert len(bufs) >= 2, "not enough buffers!"
if not copy:
for i in range(2):
bufs[i] = bufs[i].bytes
f = uncan(pickle.loads(bufs.pop(0)), g)
info = pickle.loads(bufs.pop(0))
arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
args = []
for i in range(info['nargs']):
arg, arg_bufs = unserialize_object(arg_bufs, g)
args.append(arg)
args = tuple(args)
assert not arg_bufs, "Shouldn't be any arg bufs left over"
kwargs = {}
for key in info['kw_keys']:
kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
kwargs[key] = kwarg
assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
return f,args,kwargs