##// END OF EJS Templates
better serialization for parallel code...
MinRK -
Show More
@@ -0,0 +1,115 b''
1 """test serialization tools"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
14 import pickle
15
16 import nose.tools as nt
17
18 # from unittest import TestCaes
19 from IPython.zmq.serialize import serialize_object, unserialize_object
20 from IPython.testing import decorators as dec
21 from IPython.utils.pickleutil import CannedArray
22
23 def roundtrip(obj):
24 """roundtrip an object through serialization"""
25 bufs = serialize_object(obj)
26 obj2, remainder = unserialize_object(bufs)
27 nt.assert_equals(remainder, [])
28 return obj2
29
30 class C(object):
31 """dummy class for """
32
33 def __init__(self, **kwargs):
34 for key,value in kwargs.iteritems():
35 setattr(self, key, value)
36
37 @dec.parametric
38 def test_roundtrip_simple():
39 for obj in [
40 'hello',
41 dict(a='b', b=10),
42 [1,2,'hi'],
43 (b'123', 'hello'),
44 ]:
45 obj2 = roundtrip(obj)
46 yield nt.assert_equals(obj, obj2)
47
48 @dec.parametric
49 def test_roundtrip_nested():
50 for obj in [
51 dict(a=range(5), b={1:b'hello'}),
52 [range(5),[range(3),(1,[b'whoda'])]],
53 ]:
54 obj2 = roundtrip(obj)
55 yield nt.assert_equals(obj, obj2)
56
57 @dec.parametric
58 def test_roundtrip_buffered():
59 for obj in [
60 dict(a=b"x"*1025),
61 b"hello"*500,
62 [b"hello"*501, 1,2,3]
63 ]:
64 bufs = serialize_object(obj)
65 yield nt.assert_equals(len(bufs), 2)
66 obj2, remainder = unserialize_object(bufs)
67 yield nt.assert_equals(remainder, [])
68 yield nt.assert_equals(obj, obj2)
69
70 @dec.parametric
71 @dec.skip_without('numpy')
72 def test_numpy():
73 import numpy
74 from numpy.testing.utils import assert_array_equal
75 for shape in ((), (0,), (100,), (1024,10), (10,8,6,5)):
76 for dtype in ('uint8', 'float64', 'int32', [('int16', 'float32')]):
77 A = numpy.empty(shape, dtype=dtype)
78 bufs = serialize_object(A)
79 B, r = unserialize_object(bufs)
80 yield nt.assert_equals(r, [])
81 yield assert_array_equal(A,B)
82
83 @dec.parametric
84 @dec.skip_without('numpy')
85 def test_numpy_in_seq():
86 import numpy
87 from numpy.testing.utils import assert_array_equal
88 for shape in ((), (0,), (100,), (1024,10), (10,8,6,5)):
89 for dtype in ('uint8', 'float64', 'int32', [('int16', 'float32')]):
90 A = numpy.empty(shape, dtype=dtype)
91 bufs = serialize_object((A,1,2,b'hello'))
92 canned = pickle.loads(bufs[0])
93 yield nt.assert_true(canned[0], CannedArray)
94 tup, r = unserialize_object(bufs)
95 B = tup[0]
96 yield nt.assert_equals(r, [])
97 yield assert_array_equal(A,B)
98
99 @dec.parametric
100 @dec.skip_without('numpy')
101 def test_numpy_in_dict():
102 import numpy
103 from numpy.testing.utils import assert_array_equal
104 for shape in ((), (0,), (100,), (1024,10), (10,8,6,5)):
105 for dtype in ('uint8', 'float64', 'int32', [('int16', 'float32')]):
106 A = numpy.empty(shape, dtype=dtype)
107 bufs = serialize_object(dict(a=A,b=1,c=range(20)))
108 canned = pickle.loads(bufs[0])
109 yield nt.assert_true(canned['a'], CannedArray)
110 d, r = unserialize_object(bufs)
111 B = d['a']
112 yield nt.assert_equals(r, [])
113 yield assert_array_equal(A,B)
114
115
@@ -239,7 +239,7 b' class TestView(ClusterTestCase, ParametricTestCase):'
239 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
239 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
240 view = self.client[:]
240 view = self.client[:]
241 a = numpy.arange(64)
241 a = numpy.arange(64)
242 view.scatter('a', a)
242 view.scatter('a', a, block=True)
243 b = view.gather('a', block=True)
243 b = view.gather('a', block=True)
244 assert_array_equal(b, a)
244 assert_array_equal(b, a)
245
245
@@ -325,7 +325,7 b' class TestView(ClusterTestCase, ParametricTestCase):'
325 r = view.map_sync(lambda x:x, arr)
325 r = view.map_sync(lambda x:x, arr)
326 self.assertEqual(r, list(arr))
326 self.assertEqual(r, list(arr))
327
327
328 def test_scatterGatherNonblocking(self):
328 def test_scatter_gather_nonblocking(self):
329 data = range(16)
329 data = range(16)
330 view = self.client[:]
330 view = self.client[:]
331 view.scatter('a', data, block=False)
331 view.scatter('a', data, block=False)
@@ -43,17 +43,11 b' from IPython.external.decorator import decorator'
43
43
44 # IPython imports
44 # IPython imports
45 from IPython.config.application import Application
45 from IPython.config.application import Application
46 from IPython.utils import py3compat
47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
48 from IPython.utils.newserialized import serialize, unserialize
49 from IPython.zmq.log import EnginePUBHandler
46 from IPython.zmq.log import EnginePUBHandler
50 from IPython.zmq.serialize import (
47 from IPython.zmq.serialize import (
51 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
48 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
52 )
49 )
53
50
54 if py3compat.PY3:
55 buffer = memoryview
56
57 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
58 # Classes
52 # Classes
59 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
@@ -19,7 +19,22 b' import copy'
19 import sys
19 import sys
20 from types import FunctionType
20 from types import FunctionType
21
21
22 try:
23 import cPickle as pickle
24 except ImportError:
25 import pickle
26
27 try:
28 import numpy
29 except:
30 numpy = None
31
22 import codeutil
32 import codeutil
33 import py3compat
34 from importstring import import_item
35
36 if py3compat.PY3:
37 buffer = memoryview
23
38
24 #-------------------------------------------------------------------------------
39 #-------------------------------------------------------------------------------
25 # Classes
40 # Classes
@@ -32,14 +47,16 b' class CannedObject(object):'
32 self.obj = copy.copy(obj)
47 self.obj = copy.copy(obj)
33 for key in keys:
48 for key in keys:
34 setattr(self.obj, key, can(getattr(obj, key)))
49 setattr(self.obj, key, can(getattr(obj, key)))
50
51 self.buffers = []
35
52
36
53 def get_object(self, g=None):
37 def getObject(self, g=None):
38 if g is None:
54 if g is None:
39 g = globals()
55 g = {}
40 for key in self.keys:
56 for key in self.keys:
41 setattr(self.obj, key, uncan(getattr(self.obj, key), g))
57 setattr(self.obj, key, uncan(getattr(self.obj, key), g))
42 return self.obj
58 return self.obj
59
43
60
44 class Reference(CannedObject):
61 class Reference(CannedObject):
45 """object for wrapping a remote reference by name."""
62 """object for wrapping a remote reference by name."""
@@ -47,13 +64,14 b' class Reference(CannedObject):'
47 if not isinstance(name, basestring):
64 if not isinstance(name, basestring):
48 raise TypeError("illegal name: %r"%name)
65 raise TypeError("illegal name: %r"%name)
49 self.name = name
66 self.name = name
67 self.buffers = []
50
68
51 def __repr__(self):
69 def __repr__(self):
52 return "<Reference: %r>"%self.name
70 return "<Reference: %r>"%self.name
53
71
54 def getObject(self, g=None):
72 def get_object(self, g=None):
55 if g is None:
73 if g is None:
56 g = globals()
74 g = {}
57
75
58 return eval(self.name, g)
76 return eval(self.name, g)
59
77
@@ -61,16 +79,17 b' class Reference(CannedObject):'
61 class CannedFunction(CannedObject):
79 class CannedFunction(CannedObject):
62
80
63 def __init__(self, f):
81 def __init__(self, f):
64 self._checkType(f)
82 self._check_type(f)
65 self.code = f.func_code
83 self.code = f.func_code
66 self.defaults = f.func_defaults
84 self.defaults = f.func_defaults
67 self.module = f.__module__ or '__main__'
85 self.module = f.__module__ or '__main__'
68 self.__name__ = f.__name__
86 self.__name__ = f.__name__
87 self.buffers = []
69
88
70 def _checkType(self, obj):
89 def _check_type(self, obj):
71 assert isinstance(obj, FunctionType), "Not a function type"
90 assert isinstance(obj, FunctionType), "Not a function type"
72
91
73 def getObject(self, g=None):
92 def get_object(self, g=None):
74 # try to load function back into its module:
93 # try to load function back into its module:
75 if not self.module.startswith('__'):
94 if not self.module.startswith('__'):
76 try:
95 try:
@@ -81,30 +100,65 b' class CannedFunction(CannedObject):'
81 g = sys.modules[self.module].__dict__
100 g = sys.modules[self.module].__dict__
82
101
83 if g is None:
102 if g is None:
84 g = globals()
103 g = {}
85 newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
104 newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
86 return newFunc
105 return newFunc
87
106
107
108 class CannedArray(CannedObject):
109 def __init__(self, obj):
110 self.shape = obj.shape
111 self.dtype = obj.dtype
112 if sum(obj.shape) == 0:
113 # just pickle it
114 self.buffers = [pickle.dumps(obj, -1)]
115 else:
116 # ensure contiguous
117 obj = numpy.ascontiguousarray(obj, dtype=None)
118 self.buffers = [buffer(obj)]
119
120 def get_object(self, g=None):
121 data = self.buffers[0]
122 if sum(self.shape) == 0:
123 # no shape, we just pickled it
124 return pickle.loads(data)
125 else:
126 return numpy.frombuffer(data, dtype=self.dtype).reshape(self.shape)
127
128
129 class CannedBytes(CannedObject):
130 wrap = bytes
131 def __init__(self, obj):
132 self.buffers = [obj]
133
134 def get_object(self, g=None):
135 data = self.buffers[0]
136 return self.wrap(data)
137
138 def CannedBuffer(CannedBytes):
139 wrap = buffer
140
88 #-------------------------------------------------------------------------------
141 #-------------------------------------------------------------------------------
89 # Functions
142 # Functions
90 #-------------------------------------------------------------------------------
143 #-------------------------------------------------------------------------------
91
144
92 def can(obj):
93 # import here to prevent module-level circular imports
94 from IPython.parallel import dependent
95 if isinstance(obj, dependent):
96 keys = ('f','df')
97 return CannedObject(obj, keys=keys)
98 elif isinstance(obj, FunctionType):
99 return CannedFunction(obj)
100 elif isinstance(obj,dict):
101 return canDict(obj)
102 elif isinstance(obj, (list,tuple)):
103 return canSequence(obj)
104 else:
105 return obj
106
145
107 def canDict(obj):
146 def can(obj):
147 """prepare an object for pickling"""
148 for cls,canner in can_map.iteritems():
149 if isinstance(cls, basestring):
150 try:
151 cls = import_item(cls)
152 except Exception:
153 # not importable
154 print "not importable: %r" % cls
155 continue
156 if isinstance(obj, cls):
157 return canner(obj)
158 return obj
159
160 def can_dict(obj):
161 """can the *values* of a dict"""
108 if isinstance(obj, dict):
162 if isinstance(obj, dict):
109 newobj = {}
163 newobj = {}
110 for k, v in obj.iteritems():
164 for k, v in obj.iteritems():
@@ -113,7 +167,8 b' def canDict(obj):'
113 else:
167 else:
114 return obj
168 return obj
115
169
116 def canSequence(obj):
170 def can_sequence(obj):
171 """can the elements of a sequence"""
117 if isinstance(obj, (list, tuple)):
172 if isinstance(obj, (list, tuple)):
118 t = type(obj)
173 t = type(obj)
119 return t([can(i) for i in obj])
174 return t([can(i) for i in obj])
@@ -121,16 +176,20 b' def canSequence(obj):'
121 return obj
176 return obj
122
177
123 def uncan(obj, g=None):
178 def uncan(obj, g=None):
124 if isinstance(obj, CannedObject):
179 """invert canning"""
125 return obj.getObject(g)
180 for cls,uncanner in uncan_map.iteritems():
126 elif isinstance(obj,dict):
181 if isinstance(cls, basestring):
127 return uncanDict(obj, g)
182 try:
128 elif isinstance(obj, (list,tuple)):
183 cls = import_item(cls)
129 return uncanSequence(obj, g)
184 except Exception:
130 else:
185 # not importable
131 return obj
186 print "not importable: %r" % cls
132
187 continue
133 def uncanDict(obj, g=None):
188 if isinstance(obj, cls):
189 return uncanner(obj, g)
190 return obj
191
192 def uncan_dict(obj, g=None):
134 if isinstance(obj, dict):
193 if isinstance(obj, dict):
135 newobj = {}
194 newobj = {}
136 for k, v in obj.iteritems():
195 for k, v in obj.iteritems():
@@ -139,7 +198,7 b' def uncanDict(obj, g=None):'
139 else:
198 else:
140 return obj
199 return obj
141
200
142 def uncanSequence(obj, g=None):
201 def uncan_sequence(obj, g=None):
143 if isinstance(obj, (list, tuple)):
202 if isinstance(obj, (list, tuple)):
144 t = type(obj)
203 t = type(obj)
145 return t([uncan(i,g) for i in obj])
204 return t([uncan(i,g) for i in obj])
@@ -147,5 +206,27 b' def uncanSequence(obj, g=None):'
147 return obj
206 return obj
148
207
149
208
150 def rebindFunctionGlobals(f, glbls):
209 #-------------------------------------------------------------------------------
151 return FunctionType(f.func_code, glbls)
210 # API dictionary
211 #-------------------------------------------------------------------------------
212
213 # These dicts can be extended for custom serialization of new objects
214
215 can_map = {
216 'IPython.parallel.dependent' : lambda obj: CannedObject(obj, keys=('f','df')),
217 'numpy.ndarray' : CannedArray,
218 FunctionType : CannedFunction,
219 bytes : CannedBytes,
220 buffer : CannedBuffer,
221 # dict : can_dict,
222 # list : can_sequence,
223 # tuple : can_sequence,
224 }
225
226 uncan_map = {
227 CannedObject : lambda obj, g: obj.get_object(g),
228 # dict : uncan_dict,
229 # list : uncan_sequence,
230 # tuple : uncan_sequence,
231 }
232
@@ -572,8 +572,8 b' class Kernel(Configurable):'
572 for key in ns.iterkeys():
572 for key in ns.iterkeys():
573 working.pop(key)
573 working.pop(key)
574
574
575 packed_result,buf = serialize_object(result)
575 result_buf = serialize_object(result)
576 result_buf = [packed_result]+buf
576
577 except:
577 except:
578 # invoke IPython traceback formatting
578 # invoke IPython traceback formatting
579 shell.showtraceback()
579 shell.showtraceback()
@@ -32,7 +32,9 b' except:'
32
32
33 # IPython imports
33 # IPython imports
34 from IPython.utils import py3compat
34 from IPython.utils import py3compat
35 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
35 from IPython.utils.pickleutil import (
36 can, uncan, can_sequence, uncan_sequence, CannedObject
37 )
36 from IPython.utils.newserialized import serialize, unserialize
38 from IPython.utils.newserialized import serialize, unserialize
37
39
38 if py3compat.PY3:
40 if py3compat.PY3:
@@ -42,7 +44,32 b' if py3compat.PY3:'
42 # Serialization Functions
44 # Serialization Functions
43 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
44
46
45 def serialize_object(obj, threshold=64e-6):
47 # maximum items to iterate through in a container
48 MAX_ITEMS = 64
49
50 def _extract_buffers(obj, threshold=1024):
51 """extract buffers larger than a certain threshold"""
52 buffers = []
53 if isinstance(obj, CannedObject) and obj.buffers:
54 for i,buf in enumerate(obj.buffers):
55 if len(buf) > threshold:
56 # buffer larger than threshold, prevent pickling
57 obj.buffers[i] = None
58 buffers.append(buf)
59 elif isinstance(buf, buffer):
60 # buffer too small for separate send, coerce to bytes
61 # because pickling buffer objects just results in broken pointers
62 obj.buffers[i] = bytes(buf)
63 return buffers
64
65 def _restore_buffers(obj, buffers):
66 """restore buffers extracted by """
67 if isinstance(obj, CannedObject) and obj.buffers:
68 for i,buf in enumerate(obj.buffers):
69 if buf is None:
70 obj.buffers[i] = buffers.pop(0)
71
72 def serialize_object(obj, threshold=1024):
46 """Serialize an object into a list of sendable buffers.
73 """Serialize an object into a list of sendable buffers.
47
74
48 Parameters
75 Parameters
@@ -50,76 +77,78 b' def serialize_object(obj, threshold=64e-6):'
50
77
51 obj : object
78 obj : object
52 The object to be serialized
79 The object to be serialized
53 threshold : float
80 threshold : int
54 The threshold for not double-pickling the content.
81 The threshold (in bytes) for pulling out data buffers
55
82 to avoid pickling them.
56
83
57 Returns
84 Returns
58 -------
85 -------
59 ('pmd', [bufs]) :
86 [bufs] : list of buffers representing the serialized object.
60 where pmd is the pickled metadata wrapper,
61 bufs is a list of data buffers
62 """
87 """
63 databuffers = []
88 buffers = []
64 if isinstance(obj, (list, tuple)):
89 if isinstance(obj, (list, tuple)) and len(obj) < MAX_ITEMS:
65 clist = canSequence(obj)
90 cobj = can_sequence(obj)
66 slist = map(serialize, clist)
91 for c in cobj:
67 for s in slist:
92 buffers.extend(_extract_buffers(c, threshold))
68 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
93 elif isinstance(obj, dict) and len(obj) < MAX_ITEMS:
69 databuffers.append(s.getData())
94 cobj = {}
70 s.data = None
71 return pickle.dumps(slist,-1), databuffers
72 elif isinstance(obj, dict):
73 sobj = {}
74 for k in sorted(obj.iterkeys()):
95 for k in sorted(obj.iterkeys()):
75 s = serialize(can(obj[k]))
96 c = can(obj[k])
76 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
97 buffers.extend(_extract_buffers(c, threshold))
77 databuffers.append(s.getData())
98 cobj[k] = c
78 s.data = None
79 sobj[k] = s
80 return pickle.dumps(sobj,-1),databuffers
81 else:
99 else:
82 s = serialize(can(obj))
100 cobj = can(obj)
83 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
101 buffers.extend(_extract_buffers(cobj, threshold))
84 databuffers.append(s.getData())
102
85 s.data = None
103 buffers.insert(0, pickle.dumps(cobj,-1))
86 return pickle.dumps(s,-1),databuffers
104 return buffers
87
105
88
106 def unserialize_object(buffers, g=None):
89 def unserialize_object(bufs):
107 """reconstruct an object serialized by serialize_object from data buffers.
90 """reconstruct an object serialized by serialize_object from data buffers."""
108
91 bufs = list(bufs)
109 Parameters
92 sobj = pickle.loads(bufs.pop(0))
110 ----------
93 if isinstance(sobj, (list, tuple)):
111
94 for s in sobj:
112 bufs : list of buffers/bytes
95 if s.data is None:
113
96 s.data = bufs.pop(0)
114 g : globals to be used when uncanning
97 return uncanSequence(map(unserialize, sobj)), bufs
115
98 elif isinstance(sobj, dict):
116 Returns
117 -------
118
119 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
120 """
121 bufs = list(buffers)
122 canned = pickle.loads(bufs.pop(0))
123 if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS:
124 for c in canned:
125 _restore_buffers(c, bufs)
126 newobj = uncan_sequence(canned, g)
127 elif isinstance(canned, dict) and len(canned) < MAX_ITEMS:
99 newobj = {}
128 newobj = {}
100 for k in sorted(sobj.iterkeys()):
129 for k in sorted(canned.iterkeys()):
101 s = sobj[k]
130 c = canned[k]
102 if s.data is None:
131 _restore_buffers(c, bufs)
103 s.data = bufs.pop(0)
132 newobj[k] = uncan(c, g)
104 newobj[k] = uncan(unserialize(s))
105 return newobj, bufs
106 else:
133 else:
107 if sobj.data is None:
134 _restore_buffers(canned, bufs)
108 sobj.data = bufs.pop(0)
135 newobj = uncan(canned, g)
109 return uncan(unserialize(sobj)), bufs
136
137 return newobj, bufs
110
138
111 def pack_apply_message(f, args, kwargs, threshold=64e-6):
139 def pack_apply_message(f, args, kwargs, threshold=1024):
112 """pack up a function, args, and kwargs to be sent over the wire
140 """pack up a function, args, and kwargs to be sent over the wire
113 as a series of buffers. Any object whose data is larger than `threshold`
141 as a series of buffers. Any object whose data is larger than `threshold`
114 will not have their data copied (currently only numpy arrays support zero-copy)"""
142 will not have their data copied (currently only numpy arrays support zero-copy)
143 """
115 msg = [pickle.dumps(can(f),-1)]
144 msg = [pickle.dumps(can(f),-1)]
116 databuffers = [] # for large objects
145 databuffers = [] # for large objects
117 sargs, bufs = serialize_object(args,threshold)
146 sargs = serialize_object(args,threshold)
118 msg.append(sargs)
147 msg.append(sargs[0])
119 databuffers.extend(bufs)
148 databuffers.extend(sargs[1:])
120 skwargs, bufs = serialize_object(kwargs,threshold)
149 skwargs = serialize_object(kwargs,threshold)
121 msg.append(skwargs)
150 msg.append(skwargs[0])
122 databuffers.extend(bufs)
151 databuffers.extend(skwargs[1:])
123 msg.extend(databuffers)
152 msg.extend(databuffers)
124 return msg
153 return msg
125
154
@@ -131,49 +160,16 b' def unpack_apply_message(bufs, g=None, copy=True):'
131 if not copy:
160 if not copy:
132 for i in range(3):
161 for i in range(3):
133 bufs[i] = bufs[i].bytes
162 bufs[i] = bufs[i].bytes
134 cf = pickle.loads(bufs.pop(0))
163 f = uncan(pickle.loads(bufs.pop(0)), g)
135 sargs = list(pickle.loads(bufs.pop(0)))
164 # sargs = bufs.pop(0)
136 skwargs = dict(pickle.loads(bufs.pop(0)))
165 # pop kwargs out, so first n-elements are args, serialized
137 # print sargs, skwargs
166 skwargs = bufs.pop(1)
138 f = uncan(cf, g)
167 args, bufs = unserialize_object(bufs, g)
139 for sa in sargs:
168 # put skwargs back in as the first element
140 if sa.data is None:
169 bufs.insert(0, skwargs)
141 m = bufs.pop(0)
170 kwargs, bufs = unserialize_object(bufs, g)
142 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
143 # always use a buffer, until memoryviews get sorted out
144 sa.data = buffer(m)
145 # disable memoryview support
146 # if copy:
147 # sa.data = buffer(m)
148 # else:
149 # sa.data = m.buffer
150 else:
151 if copy:
152 sa.data = m
153 else:
154 sa.data = m.bytes
155
171
156 args = uncanSequence(map(unserialize, sargs), g)
172 assert not bufs, "Shouldn't be any data left over"
157 kwargs = {}
158 for k in sorted(skwargs.iterkeys()):
159 sa = skwargs[k]
160 if sa.data is None:
161 m = bufs.pop(0)
162 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
163 # always use a buffer, until memoryviews get sorted out
164 sa.data = buffer(m)
165 # disable memoryview support
166 # if copy:
167 # sa.data = buffer(m)
168 # else:
169 # sa.data = m.buffer
170 else:
171 if copy:
172 sa.data = m
173 else:
174 sa.data = m.bytes
175
176 kwargs[k] = uncan(unserialize(sa), g)
177
173
178 return f,args,kwargs
174 return f,args,kwargs
179
175
General Comments 0
You need to be logged in to leave comments. Login now