##// END OF EJS Templates
remove ipython_kernel.zmq sub-pkg
remove ipython_kernel.zmq sub-pkg

File last commit:

r20957:16fd1e32
r20957:16fd1e32
Show More
serialize.py
179 lines | 5.8 KiB | text/x-python | PythonLexer
MinRK
add pickleutil.PICKLE_PROTOCOL...
r17044 """serialization utilities for apply messages"""
MinRK
move apply serialization into zmq.serialize
r6788
MinRK
add pickleutil.PICKLE_PROTOCOL...
r17044 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
MinRK
move apply serialization into zmq.serialize
r6788
try:
import cPickle
pickle = cPickle
except:
cPickle = None
import pickle
# IPython imports
Min RK
cast buffers to bytes on Python 2 for pickle.loads...
r19204 from IPython.utils.py3compat import PY3, buffer_to_bytes_py2
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 from IPython.utils.data import flatten
MinRK
better serialization for parallel code...
r7967 from IPython.utils.pickleutil import (
MinRK
use istype instead of isinstance for canning tuples/lists...
r9712 can, uncan, can_sequence, uncan_sequence, CannedObject,
MinRK
add pickleutil.PICKLE_PROTOCOL...
r17044 istype, sequence_types, PICKLE_PROTOCOL,
MinRK
remove unused newserialized imports
r7969 )
Min RK
bigsplit: ipython_kernel
r20955 from jupyter_client.session import MAX_ITEMS, MAX_BYTES
Min RK
move MAX_ITEMS, MAX_BYTES to session from serialize
r20954
MinRK
move apply serialization into zmq.serialize
r6788
Min RK
cast buffers to bytes on Python 2 for pickle.loads...
r19204 if PY3:
MinRK
move apply serialization into zmq.serialize
r6788 buffer = memoryview
#-----------------------------------------------------------------------------
# Serialization Functions
#-----------------------------------------------------------------------------
MinRK
better serialization for parallel code...
r7967
MinRK
allow configuration of item/buffer thresholds
r8033 def _extract_buffers(obj, threshold=MAX_BYTES):
MinRK
better serialization for parallel code...
r7967 """extract buffers larger than a certain threshold"""
buffers = []
if isinstance(obj, CannedObject) and obj.buffers:
for i,buf in enumerate(obj.buffers):
if len(buf) > threshold:
# buffer larger than threshold, prevent pickling
obj.buffers[i] = None
buffers.append(buf)
elif isinstance(buf, buffer):
# buffer too small for separate send, coerce to bytes
# because pickling buffer objects just results in broken pointers
obj.buffers[i] = bytes(buf)
return buffers
def _restore_buffers(obj, buffers):
"""restore buffers extracted by """
if isinstance(obj, CannedObject) and obj.buffers:
for i,buf in enumerate(obj.buffers):
if buf is None:
obj.buffers[i] = buffers.pop(0)
MinRK
allow configuration of item/buffer thresholds
r8033 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
MinRK
move apply serialization into zmq.serialize
r6788 """Serialize an object into a list of sendable buffers.
Min RK
bigsplit: ipython_kernel
r20955
MinRK
move apply serialization into zmq.serialize
r6788 Parameters
----------
Min RK
bigsplit: ipython_kernel
r20955
MinRK
move apply serialization into zmq.serialize
r6788 obj : object
The object to be serialized
MinRK
allow configuration of item/buffer thresholds
r8033 buffer_threshold : int
MinRK
better serialization for parallel code...
r7967 The threshold (in bytes) for pulling out data buffers
to avoid pickling them.
MinRK
allow configuration of item/buffer thresholds
r8033 item_threshold : int
The maximum number of items over which canning will iterate.
Containers (lists, dicts) larger than this will be pickled without
introspection.
Min RK
bigsplit: ipython_kernel
r20955
MinRK
move apply serialization into zmq.serialize
r6788 Returns
-------
MinRK
better serialization for parallel code...
r7967 [bufs] : list of buffers representing the serialized object.
MinRK
move apply serialization into zmq.serialize
r6788 """
MinRK
better serialization for parallel code...
r7967 buffers = []
MinRK
use istype instead of isinstance for canning tuples/lists...
r9712 if istype(obj, sequence_types) and len(obj) < item_threshold:
MinRK
better serialization for parallel code...
r7967 cobj = can_sequence(obj)
for c in cobj:
MinRK
allow configuration of item/buffer thresholds
r8033 buffers.extend(_extract_buffers(c, buffer_threshold))
MinRK
use istype instead of isinstance for canning tuples/lists...
r9712 elif istype(obj, dict) and len(obj) < item_threshold:
MinRK
better serialization for parallel code...
r7967 cobj = {}
Thomas Kluyver
Remove uses of iterkeys
r13360 for k in sorted(obj):
MinRK
better serialization for parallel code...
r7967 c = can(obj[k])
MinRK
allow configuration of item/buffer thresholds
r8033 buffers.extend(_extract_buffers(c, buffer_threshold))
MinRK
better serialization for parallel code...
r7967 cobj[k] = c
MinRK
move apply serialization into zmq.serialize
r6788 else:
MinRK
better serialization for parallel code...
r7967 cobj = can(obj)
MinRK
allow configuration of item/buffer thresholds
r8033 buffers.extend(_extract_buffers(cobj, buffer_threshold))
MinRK
better serialization for parallel code...
r7967
MinRK
add pickleutil.PICKLE_PROTOCOL...
r17044 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
MinRK
better serialization for parallel code...
r7967 return buffers
MinRK
s/unserialize/deserialize
r18330 def deserialize_object(buffers, g=None):
MinRK
better serialization for parallel code...
r7967 """reconstruct an object serialized by serialize_object from data buffers.
Min RK
bigsplit: ipython_kernel
r20955
MinRK
better serialization for parallel code...
r7967 Parameters
----------
Min RK
bigsplit: ipython_kernel
r20955
MinRK
better serialization for parallel code...
r7967 bufs : list of buffers/bytes
Min RK
bigsplit: ipython_kernel
r20955
MinRK
better serialization for parallel code...
r7967 g : globals to be used when uncanning
Min RK
bigsplit: ipython_kernel
r20955
MinRK
better serialization for parallel code...
r7967 Returns
-------
Min RK
bigsplit: ipython_kernel
r20955
MinRK
better serialization for parallel code...
r7967 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
"""
bufs = list(buffers)
Min RK
cast buffers to bytes on Python 2 for pickle.loads...
r19204 pobj = buffer_to_bytes_py2(bufs.pop(0))
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 canned = pickle.loads(pobj)
MinRK
use istype instead of isinstance for canning tuples/lists...
r9712 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
MinRK
better serialization for parallel code...
r7967 for c in canned:
_restore_buffers(c, bufs)
newobj = uncan_sequence(canned, g)
Stephan Rave
Use istype() when checking if canned object is a dict...
r12199 elif istype(canned, dict) and len(canned) < MAX_ITEMS:
MinRK
move apply serialization into zmq.serialize
r6788 newobj = {}
Thomas Kluyver
Remove uses of iterkeys
r13360 for k in sorted(canned):
MinRK
better serialization for parallel code...
r7967 c = canned[k]
_restore_buffers(c, bufs)
newobj[k] = uncan(c, g)
MinRK
move apply serialization into zmq.serialize
r6788 else:
MinRK
better serialization for parallel code...
r7967 _restore_buffers(canned, bufs)
newobj = uncan(canned, g)
Min RK
bigsplit: ipython_kernel
r20955
MinRK
better serialization for parallel code...
r7967 return newobj, bufs
MinRK
move apply serialization into zmq.serialize
r6788
MinRK
allow configuration of item/buffer thresholds
r8033 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
MinRK
move apply serialization into zmq.serialize
r6788 """pack up a function, args, and kwargs to be sent over the wire
Min RK
bigsplit: ipython_kernel
r20955
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 Each element of args/kwargs will be canned for special treatment,
but inspection will not go any deeper than that.
Min RK
bigsplit: ipython_kernel
r20955
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 Any object whose data is larger than `threshold` will not have their data copied
(only numpy arrays and bytes/buffers support zero-copy)
Min RK
bigsplit: ipython_kernel
r20955
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 Message will be a list of bytes/buffers of the format:
Min RK
bigsplit: ipython_kernel
r20955
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
Min RK
bigsplit: ipython_kernel
r20955
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 With length at least two + len(args) + len(kwargs)
MinRK
better serialization for parallel code...
r7967 """
Min RK
bigsplit: ipython_kernel
r20955
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
Min RK
bigsplit: ipython_kernel
r20955
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 kw_keys = sorted(kwargs.keys())
kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
Min RK
bigsplit: ipython_kernel
r20955
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
Min RK
bigsplit: ipython_kernel
r20955
MinRK
add pickleutil.PICKLE_PROTOCOL...
r17044 msg = [pickle.dumps(can(f), PICKLE_PROTOCOL)]
msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 msg.extend(arg_bufs)
msg.extend(kwarg_bufs)
Min RK
bigsplit: ipython_kernel
r20955
MinRK
move apply serialization into zmq.serialize
r6788 return msg
def unpack_apply_message(bufs, g=None, copy=True):
"""unpack f,args,kwargs from buffers packed by pack_apply_message()
Returns: original f,args,kwargs"""
bufs = list(bufs) # allow us to pop
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 assert len(bufs) >= 2, "not enough buffers!"
Min RK
cast buffers to bytes on Python 2 for pickle.loads...
r19204 pf = buffer_to_bytes_py2(bufs.pop(0))
f = uncan(pickle.loads(pf), g)
pinfo = buffer_to_bytes_py2(bufs.pop(0))
info = pickle.loads(pinfo)
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
Min RK
bigsplit: ipython_kernel
r20955
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 args = []
for i in range(info['nargs']):
MinRK
s/unserialize/deserialize
r18330 arg, arg_bufs = deserialize_object(arg_bufs, g)
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 args.append(arg)
args = tuple(args)
assert not arg_bufs, "Shouldn't be any arg bufs left over"
Min RK
bigsplit: ipython_kernel
r20955
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 kwargs = {}
for key in info['kw_keys']:
MinRK
s/unserialize/deserialize
r18330 kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g)
MinRK
serialize elements of args/kwargs in pack_apply message...
r8133 kwargs[key] = kwarg
assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
MinRK
move apply serialization into zmq.serialize
r6788
Min RK
bigsplit: ipython_kernel
r20955 return f,args,kwargs