##// END OF EJS Templates
add engine_info to execute errors
add engine_info to execute errors

File last commit:

r6788:ed57dafb
r7038:d72b23cb
Show More
serialize.py
179 lines | 5.7 KiB | text/x-python | PythonLexer
MinRK
move apply serialization into zmq.serialize
r6788 """serialization utilities for apply messages
Authors:
* Min RK
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Standard library imports
import logging
import os
import re
import socket
import sys
try:
import cPickle
pickle = cPickle
except:
cPickle = None
import pickle
# IPython imports
from IPython.utils import py3compat
from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
from IPython.utils.newserialized import serialize, unserialize
if py3compat.PY3:
buffer = memoryview
#-----------------------------------------------------------------------------
# Serialization Functions
#-----------------------------------------------------------------------------
def serialize_object(obj, threshold=64e-6):
"""Serialize an object into a list of sendable buffers.
Parameters
----------
obj : object
The object to be serialized
threshold : float
The threshold for not double-pickling the content.
Returns
-------
('pmd', [bufs]) :
where pmd is the pickled metadata wrapper,
bufs is a list of data buffers
"""
databuffers = []
if isinstance(obj, (list, tuple)):
clist = canSequence(obj)
slist = map(serialize, clist)
for s in slist:
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
databuffers.append(s.getData())
s.data = None
return pickle.dumps(slist,-1), databuffers
elif isinstance(obj, dict):
sobj = {}
for k in sorted(obj.iterkeys()):
s = serialize(can(obj[k]))
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
databuffers.append(s.getData())
s.data = None
sobj[k] = s
return pickle.dumps(sobj,-1),databuffers
else:
s = serialize(can(obj))
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
databuffers.append(s.getData())
s.data = None
return pickle.dumps(s,-1),databuffers
def unserialize_object(bufs):
"""reconstruct an object serialized by serialize_object from data buffers."""
bufs = list(bufs)
sobj = pickle.loads(bufs.pop(0))
if isinstance(sobj, (list, tuple)):
for s in sobj:
if s.data is None:
s.data = bufs.pop(0)
return uncanSequence(map(unserialize, sobj)), bufs
elif isinstance(sobj, dict):
newobj = {}
for k in sorted(sobj.iterkeys()):
s = sobj[k]
if s.data is None:
s.data = bufs.pop(0)
newobj[k] = uncan(unserialize(s))
return newobj, bufs
else:
if sobj.data is None:
sobj.data = bufs.pop(0)
return uncan(unserialize(sobj)), bufs
def pack_apply_message(f, args, kwargs, threshold=64e-6):
"""pack up a function, args, and kwargs to be sent over the wire
as a series of buffers. Any object whose data is larger than `threshold`
will not have their data copied (currently only numpy arrays support zero-copy)"""
msg = [pickle.dumps(can(f),-1)]
databuffers = [] # for large objects
sargs, bufs = serialize_object(args,threshold)
msg.append(sargs)
databuffers.extend(bufs)
skwargs, bufs = serialize_object(kwargs,threshold)
msg.append(skwargs)
databuffers.extend(bufs)
msg.extend(databuffers)
return msg
def unpack_apply_message(bufs, g=None, copy=True):
"""unpack f,args,kwargs from buffers packed by pack_apply_message()
Returns: original f,args,kwargs"""
bufs = list(bufs) # allow us to pop
assert len(bufs) >= 3, "not enough buffers!"
if not copy:
for i in range(3):
bufs[i] = bufs[i].bytes
cf = pickle.loads(bufs.pop(0))
sargs = list(pickle.loads(bufs.pop(0)))
skwargs = dict(pickle.loads(bufs.pop(0)))
# print sargs, skwargs
f = uncan(cf, g)
for sa in sargs:
if sa.data is None:
m = bufs.pop(0)
if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
# always use a buffer, until memoryviews get sorted out
sa.data = buffer(m)
# disable memoryview support
# if copy:
# sa.data = buffer(m)
# else:
# sa.data = m.buffer
else:
if copy:
sa.data = m
else:
sa.data = m.bytes
args = uncanSequence(map(unserialize, sargs), g)
kwargs = {}
for k in sorted(skwargs.iterkeys()):
sa = skwargs[k]
if sa.data is None:
m = bufs.pop(0)
if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
# always use a buffer, until memoryviews get sorted out
sa.data = buffer(m)
# disable memoryview support
# if copy:
# sa.data = buffer(m)
# else:
# sa.data = m.buffer
else:
if copy:
sa.data = m
else:
sa.data = m.bytes
kwargs[k] = uncan(unserialize(sa), g)
return f,args,kwargs