serialize.py
180 lines
| 5.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r6788 | """serialization utilities for apply messages | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2010-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
# Standard library imports | ||||
import logging | ||||
import os | ||||
import re | ||||
import socket | ||||
import sys | ||||
try: | ||||
import cPickle | ||||
pickle = cPickle | ||||
except: | ||||
cPickle = None | ||||
import pickle | ||||
# IPython imports | ||||
from IPython.utils import py3compat | ||||
MinRK
|
r7967 | from IPython.utils.pickleutil import ( | ||
can, uncan, can_sequence, uncan_sequence, CannedObject | ||||
MinRK
|
r7969 | ) | ||
MinRK
|
r6788 | |||
if py3compat.PY3: | ||||
buffer = memoryview | ||||
#----------------------------------------------------------------------------- | ||||
# Serialization Functions | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r8033 | # default values for the thresholds: | ||
MinRK
|
r7967 | MAX_ITEMS = 64 | ||
MinRK
|
r8033 | MAX_BYTES = 1024 | ||
MinRK
|
r7967 | |||
MinRK
|
r8033 | def _extract_buffers(obj, threshold=MAX_BYTES): | ||
MinRK
|
r7967 | """extract buffers larger than a certain threshold""" | ||
buffers = [] | ||||
if isinstance(obj, CannedObject) and obj.buffers: | ||||
for i,buf in enumerate(obj.buffers): | ||||
if len(buf) > threshold: | ||||
# buffer larger than threshold, prevent pickling | ||||
obj.buffers[i] = None | ||||
buffers.append(buf) | ||||
elif isinstance(buf, buffer): | ||||
# buffer too small for separate send, coerce to bytes | ||||
# because pickling buffer objects just results in broken pointers | ||||
obj.buffers[i] = bytes(buf) | ||||
return buffers | ||||
def _restore_buffers(obj, buffers): | ||||
"""restore buffers extracted by """ | ||||
if isinstance(obj, CannedObject) and obj.buffers: | ||||
for i,buf in enumerate(obj.buffers): | ||||
if buf is None: | ||||
obj.buffers[i] = buffers.pop(0) | ||||
MinRK
|
r8033 | def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): | ||
MinRK
|
r6788 | """Serialize an object into a list of sendable buffers. | ||
Parameters | ||||
---------- | ||||
obj : object | ||||
The object to be serialized | ||||
MinRK
|
r8033 | buffer_threshold : int | ||
MinRK
|
r7967 | The threshold (in bytes) for pulling out data buffers | ||
to avoid pickling them. | ||||
MinRK
|
r8033 | item_threshold : int | ||
The maximum number of items over which canning will iterate. | ||||
Containers (lists, dicts) larger than this will be pickled without | ||||
introspection. | ||||
MinRK
|
r6788 | |||
Returns | ||||
------- | ||||
MinRK
|
r7967 | [bufs] : list of buffers representing the serialized object. | ||
MinRK
|
r6788 | """ | ||
MinRK
|
r7967 | buffers = [] | ||
MinRK
|
r8033 | if isinstance(obj, (list, tuple)) and len(obj) < item_threshold: | ||
MinRK
|
r7967 | cobj = can_sequence(obj) | ||
for c in cobj: | ||||
MinRK
|
r8033 | buffers.extend(_extract_buffers(c, buffer_threshold)) | ||
elif isinstance(obj, dict) and len(obj) < item_threshold: | ||||
MinRK
|
r7967 | cobj = {} | ||
MinRK
|
r6788 | for k in sorted(obj.iterkeys()): | ||
MinRK
|
r7967 | c = can(obj[k]) | ||
MinRK
|
r8033 | buffers.extend(_extract_buffers(c, buffer_threshold)) | ||
MinRK
|
r7967 | cobj[k] = c | ||
MinRK
|
r6788 | else: | ||
MinRK
|
r7967 | cobj = can(obj) | ||
MinRK
|
r8033 | buffers.extend(_extract_buffers(cobj, buffer_threshold)) | ||
MinRK
|
r7967 | |||
buffers.insert(0, pickle.dumps(cobj,-1)) | ||||
return buffers | ||||
def unserialize_object(buffers, g=None): | ||||
"""reconstruct an object serialized by serialize_object from data buffers. | ||||
Parameters | ||||
---------- | ||||
bufs : list of buffers/bytes | ||||
g : globals to be used when uncanning | ||||
Returns | ||||
------- | ||||
(newobj, bufs) : unpacked object, and the list of remaining unused buffers. | ||||
""" | ||||
bufs = list(buffers) | ||||
canned = pickle.loads(bufs.pop(0)) | ||||
if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS: | ||||
for c in canned: | ||||
_restore_buffers(c, bufs) | ||||
newobj = uncan_sequence(canned, g) | ||||
elif isinstance(canned, dict) and len(canned) < MAX_ITEMS: | ||||
MinRK
|
r6788 | newobj = {} | ||
MinRK
|
r7967 | for k in sorted(canned.iterkeys()): | ||
c = canned[k] | ||||
_restore_buffers(c, bufs) | ||||
newobj[k] = uncan(c, g) | ||||
MinRK
|
r6788 | else: | ||
MinRK
|
r7967 | _restore_buffers(canned, bufs) | ||
newobj = uncan(canned, g) | ||||
return newobj, bufs | ||||
MinRK
|
r6788 | |||
MinRK
|
r8033 | def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): | ||
MinRK
|
r6788 | """pack up a function, args, and kwargs to be sent over the wire | ||
MinRK
|
r8033 | |||
MinRK
|
r6788 | as a series of buffers. Any object whose data is larger than `threshold` | ||
MinRK
|
r7967 | will not have their data copied (currently only numpy arrays support zero-copy) | ||
""" | ||||
MinRK
|
r6788 | msg = [pickle.dumps(can(f),-1)] | ||
databuffers = [] # for large objects | ||||
MinRK
|
r8033 | sargs = serialize_object(args, buffer_threshold, item_threshold) | ||
MinRK
|
r7967 | msg.append(sargs[0]) | ||
databuffers.extend(sargs[1:]) | ||||
MinRK
|
r8033 | skwargs = serialize_object(kwargs, buffer_threshold, item_threshold) | ||
MinRK
|
r7967 | msg.append(skwargs[0]) | ||
databuffers.extend(skwargs[1:]) | ||||
MinRK
|
r6788 | msg.extend(databuffers) | ||
return msg | ||||
def unpack_apply_message(bufs, g=None, copy=True): | ||||
"""unpack f,args,kwargs from buffers packed by pack_apply_message() | ||||
Returns: original f,args,kwargs""" | ||||
bufs = list(bufs) # allow us to pop | ||||
assert len(bufs) >= 3, "not enough buffers!" | ||||
if not copy: | ||||
for i in range(3): | ||||
bufs[i] = bufs[i].bytes | ||||
MinRK
|
r7967 | f = uncan(pickle.loads(bufs.pop(0)), g) | ||
# sargs = bufs.pop(0) | ||||
# pop kwargs out, so first n-elements are args, serialized | ||||
skwargs = bufs.pop(1) | ||||
args, bufs = unserialize_object(bufs, g) | ||||
# put skwargs back in as the first element | ||||
bufs.insert(0, skwargs) | ||||
kwargs, bufs = unserialize_object(bufs, g) | ||||
MinRK
|
r6788 | |||
MinRK
|
r7967 | assert not bufs, "Shouldn't be any data left over" | ||
MinRK
|
r6788 | |||
return f,args,kwargs | ||||