##// END OF EJS Templates
Use istype() when checking if canned object is a dict...
Stephan Rave -
Show More
@@ -1,198 +1,198 b''
1 1 """serialization utilities for apply messages
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 try:
19 19 import cPickle
20 20 pickle = cPickle
21 21 except:
22 22 cPickle = None
23 23 import pickle
24 24
25 25
26 26 # IPython imports
27 27 from IPython.utils import py3compat
28 28 from IPython.utils.data import flatten
29 29 from IPython.utils.pickleutil import (
30 30 can, uncan, can_sequence, uncan_sequence, CannedObject,
31 31 istype, sequence_types,
32 32 )
33 33
34 34 if py3compat.PY3:
35 35 buffer = memoryview
36 36
37 37 #-----------------------------------------------------------------------------
38 38 # Serialization Functions
39 39 #-----------------------------------------------------------------------------
40 40
41 41 # default values for the thresholds:
42 42 MAX_ITEMS = 64
43 43 MAX_BYTES = 1024
44 44
45 45 def _extract_buffers(obj, threshold=MAX_BYTES):
46 46 """extract buffers larger than a certain threshold"""
47 47 buffers = []
48 48 if isinstance(obj, CannedObject) and obj.buffers:
49 49 for i,buf in enumerate(obj.buffers):
50 50 if len(buf) > threshold:
51 51 # buffer larger than threshold, prevent pickling
52 52 obj.buffers[i] = None
53 53 buffers.append(buf)
54 54 elif isinstance(buf, buffer):
55 55 # buffer too small for separate send, coerce to bytes
56 56 # because pickling buffer objects just results in broken pointers
57 57 obj.buffers[i] = bytes(buf)
58 58 return buffers
59 59
60 60 def _restore_buffers(obj, buffers):
61 61 """restore buffers extracted by """
62 62 if isinstance(obj, CannedObject) and obj.buffers:
63 63 for i,buf in enumerate(obj.buffers):
64 64 if buf is None:
65 65 obj.buffers[i] = buffers.pop(0)
66 66
67 67 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
68 68 """Serialize an object into a list of sendable buffers.
69 69
70 70 Parameters
71 71 ----------
72 72
73 73 obj : object
74 74 The object to be serialized
75 75 buffer_threshold : int
76 76 The threshold (in bytes) for pulling out data buffers
77 77 to avoid pickling them.
78 78 item_threshold : int
79 79 The maximum number of items over which canning will iterate.
80 80 Containers (lists, dicts) larger than this will be pickled without
81 81 introspection.
82 82
83 83 Returns
84 84 -------
85 85 [bufs] : list of buffers representing the serialized object.
86 86 """
87 87 buffers = []
88 88 if istype(obj, sequence_types) and len(obj) < item_threshold:
89 89 cobj = can_sequence(obj)
90 90 for c in cobj:
91 91 buffers.extend(_extract_buffers(c, buffer_threshold))
92 92 elif istype(obj, dict) and len(obj) < item_threshold:
93 93 cobj = {}
94 94 for k in sorted(obj.iterkeys()):
95 95 c = can(obj[k])
96 96 buffers.extend(_extract_buffers(c, buffer_threshold))
97 97 cobj[k] = c
98 98 else:
99 99 cobj = can(obj)
100 100 buffers.extend(_extract_buffers(cobj, buffer_threshold))
101 101
102 102 buffers.insert(0, pickle.dumps(cobj,-1))
103 103 return buffers
104 104
105 105 def unserialize_object(buffers, g=None):
106 106 """reconstruct an object serialized by serialize_object from data buffers.
107 107
108 108 Parameters
109 109 ----------
110 110
111 111 bufs : list of buffers/bytes
112 112
113 113 g : globals to be used when uncanning
114 114
115 115 Returns
116 116 -------
117 117
118 118 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
119 119 """
120 120 bufs = list(buffers)
121 121 pobj = bufs.pop(0)
122 122 if not isinstance(pobj, bytes):
123 123 # a zmq message
124 124 pobj = bytes(pobj)
125 125 canned = pickle.loads(pobj)
126 126 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
127 127 for c in canned:
128 128 _restore_buffers(c, bufs)
129 129 newobj = uncan_sequence(canned, g)
130 elif isinstance(canned, dict) and len(canned) < MAX_ITEMS:
130 elif istype(canned, dict) and len(canned) < MAX_ITEMS:
131 131 newobj = {}
132 132 for k in sorted(canned.iterkeys()):
133 133 c = canned[k]
134 134 _restore_buffers(c, bufs)
135 135 newobj[k] = uncan(c, g)
136 136 else:
137 137 _restore_buffers(canned, bufs)
138 138 newobj = uncan(canned, g)
139 139
140 140 return newobj, bufs
141 141
142 142 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
143 143 """pack up a function, args, and kwargs to be sent over the wire
144 144
145 145 Each element of args/kwargs will be canned for special treatment,
146 146 but inspection will not go any deeper than that.
147 147
148 148 Any object whose data is larger than `threshold` will not have their data copied
149 149 (only numpy arrays and bytes/buffers support zero-copy)
150 150
151 151 Message will be a list of bytes/buffers of the format:
152 152
153 153 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
154 154
155 155 With length at least two + len(args) + len(kwargs)
156 156 """
157 157
158 158 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
159 159
160 160 kw_keys = sorted(kwargs.keys())
161 161 kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
162 162
163 163 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
164 164
165 165 msg = [pickle.dumps(can(f),-1)]
166 166 msg.append(pickle.dumps(info, -1))
167 167 msg.extend(arg_bufs)
168 168 msg.extend(kwarg_bufs)
169 169
170 170 return msg
171 171
172 172 def unpack_apply_message(bufs, g=None, copy=True):
173 173 """unpack f,args,kwargs from buffers packed by pack_apply_message()
174 174 Returns: original f,args,kwargs"""
175 175 bufs = list(bufs) # allow us to pop
176 176 assert len(bufs) >= 2, "not enough buffers!"
177 177 if not copy:
178 178 for i in range(2):
179 179 bufs[i] = bufs[i].bytes
180 180 f = uncan(pickle.loads(bufs.pop(0)), g)
181 181 info = pickle.loads(bufs.pop(0))
182 182 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
183 183
184 184 args = []
185 185 for i in range(info['nargs']):
186 186 arg, arg_bufs = unserialize_object(arg_bufs, g)
187 187 args.append(arg)
188 188 args = tuple(args)
189 189 assert not arg_bufs, "Shouldn't be any arg bufs left over"
190 190
191 191 kwargs = {}
192 192 for key in info['kw_keys']:
193 193 kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
194 194 kwargs[key] = kwarg
195 195 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
196 196
197 197 return f,args,kwargs
198 198
General Comments 0
You need to be logged in to leave comments. Login now