##// END OF EJS Templates
cast buffers to bytes on Python 2 for pickle.loads...
Min RK -
Show More
@@ -1,185 +1,181 b''
1 1 """serialization utilities for apply messages"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 try:
7 7 import cPickle
8 8 pickle = cPickle
9 9 except:
10 10 cPickle = None
11 11 import pickle
12 12
13 13 # IPython imports
14 from IPython.utils import py3compat
14 from IPython.utils.py3compat import PY3, buffer_to_bytes_py2
15 15 from IPython.utils.data import flatten
16 16 from IPython.utils.pickleutil import (
17 17 can, uncan, can_sequence, uncan_sequence, CannedObject,
18 18 istype, sequence_types, PICKLE_PROTOCOL,
19 19 )
20 20
21 if py3compat.PY3:
21 if PY3:
22 22 buffer = memoryview
23 23
24 24 #-----------------------------------------------------------------------------
25 25 # Serialization Functions
26 26 #-----------------------------------------------------------------------------
27 27
28 28 # default values for the thresholds:
29 29 MAX_ITEMS = 64
30 30 MAX_BYTES = 1024
31 31
32 32 def _extract_buffers(obj, threshold=MAX_BYTES):
33 33 """extract buffers larger than a certain threshold"""
34 34 buffers = []
35 35 if isinstance(obj, CannedObject) and obj.buffers:
36 36 for i,buf in enumerate(obj.buffers):
37 37 if len(buf) > threshold:
38 38 # buffer larger than threshold, prevent pickling
39 39 obj.buffers[i] = None
40 40 buffers.append(buf)
41 41 elif isinstance(buf, buffer):
42 42 # buffer too small for separate send, coerce to bytes
43 43 # because pickling buffer objects just results in broken pointers
44 44 obj.buffers[i] = bytes(buf)
45 45 return buffers
46 46
47 47 def _restore_buffers(obj, buffers):
48 48 """restore buffers extracted by """
49 49 if isinstance(obj, CannedObject) and obj.buffers:
50 50 for i,buf in enumerate(obj.buffers):
51 51 if buf is None:
52 52 obj.buffers[i] = buffers.pop(0)
53 53
54 54 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
55 55 """Serialize an object into a list of sendable buffers.
56 56
57 57 Parameters
58 58 ----------
59 59
60 60 obj : object
61 61 The object to be serialized
62 62 buffer_threshold : int
63 63 The threshold (in bytes) for pulling out data buffers
64 64 to avoid pickling them.
65 65 item_threshold : int
66 66 The maximum number of items over which canning will iterate.
67 67 Containers (lists, dicts) larger than this will be pickled without
68 68 introspection.
69 69
70 70 Returns
71 71 -------
72 72 [bufs] : list of buffers representing the serialized object.
73 73 """
74 74 buffers = []
75 75 if istype(obj, sequence_types) and len(obj) < item_threshold:
76 76 cobj = can_sequence(obj)
77 77 for c in cobj:
78 78 buffers.extend(_extract_buffers(c, buffer_threshold))
79 79 elif istype(obj, dict) and len(obj) < item_threshold:
80 80 cobj = {}
81 81 for k in sorted(obj):
82 82 c = can(obj[k])
83 83 buffers.extend(_extract_buffers(c, buffer_threshold))
84 84 cobj[k] = c
85 85 else:
86 86 cobj = can(obj)
87 87 buffers.extend(_extract_buffers(cobj, buffer_threshold))
88 88
89 89 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
90 90 return buffers
91 91
92 92 def deserialize_object(buffers, g=None):
93 93 """reconstruct an object serialized by serialize_object from data buffers.
94 94
95 95 Parameters
96 96 ----------
97 97
98 98 bufs : list of buffers/bytes
99 99
100 100 g : globals to be used when uncanning
101 101
102 102 Returns
103 103 -------
104 104
105 105 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
106 106 """
107 107 bufs = list(buffers)
108 pobj = bufs.pop(0)
109 if not isinstance(pobj, bytes):
110 # a zmq message
111 pobj = bytes(pobj)
108 pobj = buffer_to_bytes_py2(bufs.pop(0))
112 109 canned = pickle.loads(pobj)
113 110 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
114 111 for c in canned:
115 112 _restore_buffers(c, bufs)
116 113 newobj = uncan_sequence(canned, g)
117 114 elif istype(canned, dict) and len(canned) < MAX_ITEMS:
118 115 newobj = {}
119 116 for k in sorted(canned):
120 117 c = canned[k]
121 118 _restore_buffers(c, bufs)
122 119 newobj[k] = uncan(c, g)
123 120 else:
124 121 _restore_buffers(canned, bufs)
125 122 newobj = uncan(canned, g)
126 123
127 124 return newobj, bufs
128 125
129 126 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
130 127 """pack up a function, args, and kwargs to be sent over the wire
131 128
132 129 Each element of args/kwargs will be canned for special treatment,
133 130 but inspection will not go any deeper than that.
134 131
135 132 Any object whose data is larger than `threshold` will not have their data copied
136 133 (only numpy arrays and bytes/buffers support zero-copy)
137 134
138 135 Message will be a list of bytes/buffers of the format:
139 136
140 137 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
141 138
142 139 With length at least two + len(args) + len(kwargs)
143 140 """
144 141
145 142 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
146 143
147 144 kw_keys = sorted(kwargs.keys())
148 145 kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
149 146
150 147 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
151 148
152 149 msg = [pickle.dumps(can(f), PICKLE_PROTOCOL)]
153 150 msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
154 151 msg.extend(arg_bufs)
155 152 msg.extend(kwarg_bufs)
156 153
157 154 return msg
158 155
159 156 def unpack_apply_message(bufs, g=None, copy=True):
160 157 """unpack f,args,kwargs from buffers packed by pack_apply_message()
161 158 Returns: original f,args,kwargs"""
162 159 bufs = list(bufs) # allow us to pop
163 160 assert len(bufs) >= 2, "not enough buffers!"
164 if not copy:
165 for i in range(2):
166 bufs[i] = bufs[i].bytes
167 f = uncan(pickle.loads(bufs.pop(0)), g)
168 info = pickle.loads(bufs.pop(0))
161 pf = buffer_to_bytes_py2(bufs.pop(0))
162 f = uncan(pickle.loads(pf), g)
163 pinfo = buffer_to_bytes_py2(bufs.pop(0))
164 info = pickle.loads(pinfo)
169 165 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
170 166
171 167 args = []
172 168 for i in range(info['nargs']):
173 169 arg, arg_bufs = deserialize_object(arg_bufs, g)
174 170 args.append(arg)
175 171 args = tuple(args)
176 172 assert not arg_bufs, "Shouldn't be any arg bufs left over"
177 173
178 174 kwargs = {}
179 175 for key in info['kw_keys']:
180 176 kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g)
181 177 kwargs[key] = kwarg
182 178 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
183 179
184 180 return f,args,kwargs
185 181
@@ -1,425 +1,425 b''
1 1 # encoding: utf-8
2 2 """Pickle related utilities. Perhaps this should be called 'can'."""
3 3
4 4 # Copyright (c) IPython Development Team.
5 5 # Distributed under the terms of the Modified BSD License.
6 6
7 7 import copy
8 8 import logging
9 9 import sys
10 10 from types import FunctionType
11 11
12 12 try:
13 13 import cPickle as pickle
14 14 except ImportError:
15 15 import pickle
16 16
17 17 from . import codeutil # This registers a hook when it's imported
18 18 from . import py3compat
19 19 from .importstring import import_item
20 from .py3compat import string_types, iteritems
20 from .py3compat import string_types, iteritems, buffer_to_bytes_py2
21 21
22 22 from IPython.config import Application
23 23 from IPython.utils.log import get_logger
24 24
25 25 if py3compat.PY3:
26 26 buffer = memoryview
27 27 class_type = type
28 28 else:
29 29 from types import ClassType
30 30 class_type = (type, ClassType)
31 31
32 32 try:
33 33 PICKLE_PROTOCOL = pickle.DEFAULT_PROTOCOL
34 34 except AttributeError:
35 35 PICKLE_PROTOCOL = pickle.HIGHEST_PROTOCOL
36 36
37 37 def _get_cell_type(a=None):
38 38 """the type of a closure cell doesn't seem to be importable,
39 39 so just create one
40 40 """
41 41 def inner():
42 42 return a
43 43 return type(py3compat.get_closure(inner)[0])
44 44
45 45 cell_type = _get_cell_type()
46 46
47 47 #-------------------------------------------------------------------------------
48 48 # Functions
49 49 #-------------------------------------------------------------------------------
50 50
51 51
52 52 def use_dill():
53 53 """use dill to expand serialization support
54 54
55 55 adds support for object methods and closures to serialization.
56 56 """
57 57 # import dill causes most of the magic
58 58 import dill
59 59
60 60 # dill doesn't work with cPickle,
61 61 # tell the two relevant modules to use plain pickle
62 62
63 63 global pickle
64 64 pickle = dill
65 65
66 66 try:
67 67 from IPython.kernel.zmq import serialize
68 68 except ImportError:
69 69 pass
70 70 else:
71 71 serialize.pickle = dill
72 72
73 73 # disable special function handling, let dill take care of it
74 74 can_map.pop(FunctionType, None)
75 75
76 76 def use_cloudpickle():
77 77 """use cloudpickle to expand serialization support
78 78
79 79 adds support for object methods and closures to serialization.
80 80 """
81 81 from cloud.serialization import cloudpickle
82 82
83 83 global pickle
84 84 pickle = cloudpickle
85 85
86 86 try:
87 87 from IPython.kernel.zmq import serialize
88 88 except ImportError:
89 89 pass
90 90 else:
91 91 serialize.pickle = cloudpickle
92 92
93 93 # disable special function handling, let cloudpickle take care of it
94 94 can_map.pop(FunctionType, None)
95 95
96 96
97 97 #-------------------------------------------------------------------------------
98 98 # Classes
99 99 #-------------------------------------------------------------------------------
100 100
101 101
102 102 class CannedObject(object):
103 103 def __init__(self, obj, keys=[], hook=None):
104 104 """can an object for safe pickling
105 105
106 106 Parameters
107 107 ==========
108 108
109 109 obj:
110 110 The object to be canned
111 111 keys: list (optional)
112 112 list of attribute names that will be explicitly canned / uncanned
113 113 hook: callable (optional)
114 114 An optional extra callable,
115 115 which can do additional processing of the uncanned object.
116 116
117 117 large data may be offloaded into the buffers list,
118 118 used for zero-copy transfers.
119 119 """
120 120 self.keys = keys
121 121 self.obj = copy.copy(obj)
122 122 self.hook = can(hook)
123 123 for key in keys:
124 124 setattr(self.obj, key, can(getattr(obj, key)))
125 125
126 126 self.buffers = []
127 127
128 128 def get_object(self, g=None):
129 129 if g is None:
130 130 g = {}
131 131 obj = self.obj
132 132 for key in self.keys:
133 133 setattr(obj, key, uncan(getattr(obj, key), g))
134 134
135 135 if self.hook:
136 136 self.hook = uncan(self.hook, g)
137 137 self.hook(obj, g)
138 138 return self.obj
139 139
140 140
141 141 class Reference(CannedObject):
142 142 """object for wrapping a remote reference by name."""
143 143 def __init__(self, name):
144 144 if not isinstance(name, string_types):
145 145 raise TypeError("illegal name: %r"%name)
146 146 self.name = name
147 147 self.buffers = []
148 148
149 149 def __repr__(self):
150 150 return "<Reference: %r>"%self.name
151 151
152 152 def get_object(self, g=None):
153 153 if g is None:
154 154 g = {}
155 155
156 156 return eval(self.name, g)
157 157
158 158
159 159 class CannedCell(CannedObject):
160 160 """Can a closure cell"""
161 161 def __init__(self, cell):
162 162 self.cell_contents = can(cell.cell_contents)
163 163
164 164 def get_object(self, g=None):
165 165 cell_contents = uncan(self.cell_contents, g)
166 166 def inner():
167 167 return cell_contents
168 168 return py3compat.get_closure(inner)[0]
169 169
170 170
171 171 class CannedFunction(CannedObject):
172 172
173 173 def __init__(self, f):
174 174 self._check_type(f)
175 175 self.code = f.__code__
176 176 if f.__defaults__:
177 177 self.defaults = [ can(fd) for fd in f.__defaults__ ]
178 178 else:
179 179 self.defaults = None
180 180
181 181 closure = py3compat.get_closure(f)
182 182 if closure:
183 183 self.closure = tuple( can(cell) for cell in closure )
184 184 else:
185 185 self.closure = None
186 186
187 187 self.module = f.__module__ or '__main__'
188 188 self.__name__ = f.__name__
189 189 self.buffers = []
190 190
191 191 def _check_type(self, obj):
192 192 assert isinstance(obj, FunctionType), "Not a function type"
193 193
194 194 def get_object(self, g=None):
195 195 # try to load function back into its module:
196 196 if not self.module.startswith('__'):
197 197 __import__(self.module)
198 198 g = sys.modules[self.module].__dict__
199 199
200 200 if g is None:
201 201 g = {}
202 202 if self.defaults:
203 203 defaults = tuple(uncan(cfd, g) for cfd in self.defaults)
204 204 else:
205 205 defaults = None
206 206 if self.closure:
207 207 closure = tuple(uncan(cell, g) for cell in self.closure)
208 208 else:
209 209 closure = None
210 210 newFunc = FunctionType(self.code, g, self.__name__, defaults, closure)
211 211 return newFunc
212 212
213 213 class CannedClass(CannedObject):
214 214
215 215 def __init__(self, cls):
216 216 self._check_type(cls)
217 217 self.name = cls.__name__
218 218 self.old_style = not isinstance(cls, type)
219 219 self._canned_dict = {}
220 220 for k,v in cls.__dict__.items():
221 221 if k not in ('__weakref__', '__dict__'):
222 222 self._canned_dict[k] = can(v)
223 223 if self.old_style:
224 224 mro = []
225 225 else:
226 226 mro = cls.mro()
227 227
228 228 self.parents = [ can(c) for c in mro[1:] ]
229 229 self.buffers = []
230 230
231 231 def _check_type(self, obj):
232 232 assert isinstance(obj, class_type), "Not a class type"
233 233
234 234 def get_object(self, g=None):
235 235 parents = tuple(uncan(p, g) for p in self.parents)
236 236 return type(self.name, parents, uncan_dict(self._canned_dict, g=g))
237 237
238 238 class CannedArray(CannedObject):
239 239 def __init__(self, obj):
240 240 from numpy import ascontiguousarray
241 241 self.shape = obj.shape
242 242 self.dtype = obj.dtype.descr if obj.dtype.fields else obj.dtype.str
243 243 self.pickled = False
244 244 if sum(obj.shape) == 0:
245 245 self.pickled = True
246 246 elif obj.dtype == 'O':
247 247 # can't handle object dtype with buffer approach
248 248 self.pickled = True
249 249 elif obj.dtype.fields and any(dt == 'O' for dt,sz in obj.dtype.fields.values()):
250 250 self.pickled = True
251 251 if self.pickled:
252 252 # just pickle it
253 253 self.buffers = [pickle.dumps(obj, PICKLE_PROTOCOL)]
254 254 else:
255 255 # ensure contiguous
256 256 obj = ascontiguousarray(obj, dtype=None)
257 257 self.buffers = [buffer(obj)]
258 258
259 259 def get_object(self, g=None):
260 260 from numpy import frombuffer
261 261 data = self.buffers[0]
262 262 if self.pickled:
263 # no shape, we just pickled it
264 return pickle.loads(data)
263 # we just pickled it
264 return pickle.loads(buffer_to_bytes_py2(data))
265 265 else:
266 266 return frombuffer(data, dtype=self.dtype).reshape(self.shape)
267 267
268 268
269 269 class CannedBytes(CannedObject):
270 270 wrap = bytes
271 271 def __init__(self, obj):
272 272 self.buffers = [obj]
273 273
274 274 def get_object(self, g=None):
275 275 data = self.buffers[0]
276 276 return self.wrap(data)
277 277
278 278 def CannedBuffer(CannedBytes):
279 279 wrap = buffer
280 280
281 281 #-------------------------------------------------------------------------------
282 282 # Functions
283 283 #-------------------------------------------------------------------------------
284 284
285 285 def _import_mapping(mapping, original=None):
286 286 """import any string-keys in a type mapping
287 287
288 288 """
289 289 log = get_logger()
290 290 log.debug("Importing canning map")
291 291 for key,value in list(mapping.items()):
292 292 if isinstance(key, string_types):
293 293 try:
294 294 cls = import_item(key)
295 295 except Exception:
296 296 if original and key not in original:
297 297 # only message on user-added classes
298 298 log.error("canning class not importable: %r", key, exc_info=True)
299 299 mapping.pop(key)
300 300 else:
301 301 mapping[cls] = mapping.pop(key)
302 302
303 303 def istype(obj, check):
304 304 """like isinstance(obj, check), but strict
305 305
306 306 This won't catch subclasses.
307 307 """
308 308 if isinstance(check, tuple):
309 309 for cls in check:
310 310 if type(obj) is cls:
311 311 return True
312 312 return False
313 313 else:
314 314 return type(obj) is check
315 315
316 316 def can(obj):
317 317 """prepare an object for pickling"""
318 318
319 319 import_needed = False
320 320
321 321 for cls,canner in iteritems(can_map):
322 322 if isinstance(cls, string_types):
323 323 import_needed = True
324 324 break
325 325 elif istype(obj, cls):
326 326 return canner(obj)
327 327
328 328 if import_needed:
329 329 # perform can_map imports, then try again
330 330 # this will usually only happen once
331 331 _import_mapping(can_map, _original_can_map)
332 332 return can(obj)
333 333
334 334 return obj
335 335
336 336 def can_class(obj):
337 337 if isinstance(obj, class_type) and obj.__module__ == '__main__':
338 338 return CannedClass(obj)
339 339 else:
340 340 return obj
341 341
342 342 def can_dict(obj):
343 343 """can the *values* of a dict"""
344 344 if istype(obj, dict):
345 345 newobj = {}
346 346 for k, v in iteritems(obj):
347 347 newobj[k] = can(v)
348 348 return newobj
349 349 else:
350 350 return obj
351 351
352 352 sequence_types = (list, tuple, set)
353 353
354 354 def can_sequence(obj):
355 355 """can the elements of a sequence"""
356 356 if istype(obj, sequence_types):
357 357 t = type(obj)
358 358 return t([can(i) for i in obj])
359 359 else:
360 360 return obj
361 361
362 362 def uncan(obj, g=None):
363 363 """invert canning"""
364 364
365 365 import_needed = False
366 366 for cls,uncanner in iteritems(uncan_map):
367 367 if isinstance(cls, string_types):
368 368 import_needed = True
369 369 break
370 370 elif isinstance(obj, cls):
371 371 return uncanner(obj, g)
372 372
373 373 if import_needed:
374 374 # perform uncan_map imports, then try again
375 375 # this will usually only happen once
376 376 _import_mapping(uncan_map, _original_uncan_map)
377 377 return uncan(obj, g)
378 378
379 379 return obj
380 380
381 381 def uncan_dict(obj, g=None):
382 382 if istype(obj, dict):
383 383 newobj = {}
384 384 for k, v in iteritems(obj):
385 385 newobj[k] = uncan(v,g)
386 386 return newobj
387 387 else:
388 388 return obj
389 389
390 390 def uncan_sequence(obj, g=None):
391 391 if istype(obj, sequence_types):
392 392 t = type(obj)
393 393 return t([uncan(i,g) for i in obj])
394 394 else:
395 395 return obj
396 396
397 397 def _uncan_dependent_hook(dep, g=None):
398 398 dep.check_dependency()
399 399
400 400 def can_dependent(obj):
401 401 return CannedObject(obj, keys=('f', 'df'), hook=_uncan_dependent_hook)
402 402
403 403 #-------------------------------------------------------------------------------
404 404 # API dictionaries
405 405 #-------------------------------------------------------------------------------
406 406
407 407 # These dicts can be extended for custom serialization of new objects
408 408
409 409 can_map = {
410 410 'IPython.parallel.dependent' : can_dependent,
411 411 'numpy.ndarray' : CannedArray,
412 412 FunctionType : CannedFunction,
413 413 bytes : CannedBytes,
414 414 buffer : CannedBuffer,
415 415 cell_type : CannedCell,
416 416 class_type : can_class,
417 417 }
418 418
419 419 uncan_map = {
420 420 CannedObject : lambda obj, g: obj.get_object(g),
421 421 }
422 422
423 423 # for use in _import_mapping:
424 424 _original_can_map = can_map.copy()
425 425 _original_uncan_map = uncan_map.copy()
General Comments 0
You need to be logged in to leave comments. Login now