##// END OF EJS Templates
use istype instead of isinstance for canning tuples/lists...
MinRK -
Show More
@@ -1,204 +1,205 b''
1 1 """serialization utilities for apply messages
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports
19 19 import logging
20 20 import os
21 21 import re
22 22 import socket
23 23 import sys
24 24
25 25 try:
26 26 import cPickle
27 27 pickle = cPickle
28 28 except:
29 29 cPickle = None
30 30 import pickle
31 31
32 32
33 33 # IPython imports
34 34 from IPython.utils import py3compat
35 35 from IPython.utils.data import flatten
36 36 from IPython.utils.pickleutil import (
37 can, uncan, can_sequence, uncan_sequence, CannedObject
37 can, uncan, can_sequence, uncan_sequence, CannedObject,
38 istype, sequence_types,
38 39 )
39 40
40 41 if py3compat.PY3:
41 42 buffer = memoryview
42 43
43 44 #-----------------------------------------------------------------------------
44 45 # Serialization Functions
45 46 #-----------------------------------------------------------------------------
46 47
47 48 # default values for the thresholds:
48 49 MAX_ITEMS = 64
49 50 MAX_BYTES = 1024
50 51
51 52 def _extract_buffers(obj, threshold=MAX_BYTES):
52 53 """extract buffers larger than a certain threshold"""
53 54 buffers = []
54 55 if isinstance(obj, CannedObject) and obj.buffers:
55 56 for i,buf in enumerate(obj.buffers):
56 57 if len(buf) > threshold:
57 58 # buffer larger than threshold, prevent pickling
58 59 obj.buffers[i] = None
59 60 buffers.append(buf)
60 61 elif isinstance(buf, buffer):
61 62 # buffer too small for separate send, coerce to bytes
62 63 # because pickling buffer objects just results in broken pointers
63 64 obj.buffers[i] = bytes(buf)
64 65 return buffers
65 66
66 67 def _restore_buffers(obj, buffers):
67 68 """restore buffers extracted by """
68 69 if isinstance(obj, CannedObject) and obj.buffers:
69 70 for i,buf in enumerate(obj.buffers):
70 71 if buf is None:
71 72 obj.buffers[i] = buffers.pop(0)
72 73
73 74 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
74 75 """Serialize an object into a list of sendable buffers.
75 76
76 77 Parameters
77 78 ----------
78 79
79 80 obj : object
80 81 The object to be serialized
81 82 buffer_threshold : int
82 83 The threshold (in bytes) for pulling out data buffers
83 84 to avoid pickling them.
84 85 item_threshold : int
85 86 The maximum number of items over which canning will iterate.
86 87 Containers (lists, dicts) larger than this will be pickled without
87 88 introspection.
88 89
89 90 Returns
90 91 -------
91 92 [bufs] : list of buffers representing the serialized object.
92 93 """
93 94 buffers = []
94 if isinstance(obj, (list, tuple)) and len(obj) < item_threshold:
95 if istype(obj, sequence_types) and len(obj) < item_threshold:
95 96 cobj = can_sequence(obj)
96 97 for c in cobj:
97 98 buffers.extend(_extract_buffers(c, buffer_threshold))
98 elif isinstance(obj, dict) and len(obj) < item_threshold:
99 elif istype(obj, dict) and len(obj) < item_threshold:
99 100 cobj = {}
100 101 for k in sorted(obj.iterkeys()):
101 102 c = can(obj[k])
102 103 buffers.extend(_extract_buffers(c, buffer_threshold))
103 104 cobj[k] = c
104 105 else:
105 106 cobj = can(obj)
106 107 buffers.extend(_extract_buffers(cobj, buffer_threshold))
107 108
108 109 buffers.insert(0, pickle.dumps(cobj,-1))
109 110 return buffers
110 111
111 112 def unserialize_object(buffers, g=None):
112 113 """reconstruct an object serialized by serialize_object from data buffers.
113 114
114 115 Parameters
115 116 ----------
116 117
117 118 bufs : list of buffers/bytes
118 119
119 120 g : globals to be used when uncanning
120 121
121 122 Returns
122 123 -------
123 124
124 125 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
125 126 """
126 127 bufs = list(buffers)
127 128 pobj = bufs.pop(0)
128 129 if not isinstance(pobj, bytes):
129 130 # a zmq message
130 131 pobj = bytes(pobj)
131 132 canned = pickle.loads(pobj)
132 if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS:
133 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
133 134 for c in canned:
134 135 _restore_buffers(c, bufs)
135 136 newobj = uncan_sequence(canned, g)
136 137 elif isinstance(canned, dict) and len(canned) < MAX_ITEMS:
137 138 newobj = {}
138 139 for k in sorted(canned.iterkeys()):
139 140 c = canned[k]
140 141 _restore_buffers(c, bufs)
141 142 newobj[k] = uncan(c, g)
142 143 else:
143 144 _restore_buffers(canned, bufs)
144 145 newobj = uncan(canned, g)
145 146
146 147 return newobj, bufs
147 148
148 149 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
149 150 """pack up a function, args, and kwargs to be sent over the wire
150 151
151 152 Each element of args/kwargs will be canned for special treatment,
152 153 but inspection will not go any deeper than that.
153 154
154 155 Any object whose data is larger than `threshold` will not have their data copied
155 156 (only numpy arrays and bytes/buffers support zero-copy)
156 157
157 158 Message will be a list of bytes/buffers of the format:
158 159
159 160 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
160 161
161 162 With length at least two + len(args) + len(kwargs)
162 163 """
163 164
164 165 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
165 166
166 167 kw_keys = sorted(kwargs.keys())
167 168 kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
168 169
169 170 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
170 171
171 172 msg = [pickle.dumps(can(f),-1)]
172 173 msg.append(pickle.dumps(info, -1))
173 174 msg.extend(arg_bufs)
174 175 msg.extend(kwarg_bufs)
175 176
176 177 return msg
177 178
178 179 def unpack_apply_message(bufs, g=None, copy=True):
179 180 """unpack f,args,kwargs from buffers packed by pack_apply_message()
180 181 Returns: original f,args,kwargs"""
181 182 bufs = list(bufs) # allow us to pop
182 183 assert len(bufs) >= 2, "not enough buffers!"
183 184 if not copy:
184 185 for i in range(2):
185 186 bufs[i] = bufs[i].bytes
186 187 f = uncan(pickle.loads(bufs.pop(0)), g)
187 188 info = pickle.loads(bufs.pop(0))
188 189 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
189 190
190 191 args = []
191 192 for i in range(info['nargs']):
192 193 arg, arg_bufs = unserialize_object(arg_bufs, g)
193 194 args.append(arg)
194 195 args = tuple(args)
195 196 assert not arg_bufs, "Shouldn't be any arg bufs left over"
196 197
197 198 kwargs = {}
198 199 for key in info['kw_keys']:
199 200 kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
200 201 kwargs[key] = kwarg
201 202 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
202 203
203 204 return f,args,kwargs
204 205
@@ -1,325 +1,328 b''
1 1 # encoding: utf-8
2 2
3 3 """Pickle related utilities. Perhaps this should be called 'can'."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import sys
21 from collections import namedtuple
21 22 from types import FunctionType
22 23
23 24 try:
24 25 import cPickle as pickle
25 26 except ImportError:
26 27 import pickle
27 28
28 29 try:
29 30 import numpy
30 31 except:
31 32 numpy = None
32 33
33 34 import codeutil
34 35 import py3compat
35 36 from importstring import import_item
36 37
37 38 from IPython.config import Application
38 39
39 40 if py3compat.PY3:
40 41 buffer = memoryview
41 42 class_type = type
42 43 else:
43 44 from types import ClassType
44 45 class_type = (type, ClassType)
45 46
46 47 #-------------------------------------------------------------------------------
47 48 # Classes
48 49 #-------------------------------------------------------------------------------
49 50
50 51
51 52 class CannedObject(object):
52 53 def __init__(self, obj, keys=[]):
53 54 self.keys = keys
54 55 self.obj = copy.copy(obj)
55 56 for key in keys:
56 57 setattr(self.obj, key, can(getattr(obj, key)))
57 58
58 59 self.buffers = []
59 60
60 61 def get_object(self, g=None):
61 62 if g is None:
62 63 g = {}
63 64 for key in self.keys:
64 65 setattr(self.obj, key, uncan(getattr(self.obj, key), g))
65 66 return self.obj
66 67
67 68
68 69 class Reference(CannedObject):
69 70 """object for wrapping a remote reference by name."""
70 71 def __init__(self, name):
71 72 if not isinstance(name, basestring):
72 73 raise TypeError("illegal name: %r"%name)
73 74 self.name = name
74 75 self.buffers = []
75 76
76 77 def __repr__(self):
77 78 return "<Reference: %r>"%self.name
78 79
79 80 def get_object(self, g=None):
80 81 if g is None:
81 82 g = {}
82 83
83 84 return eval(self.name, g)
84 85
85 86
86 87 class CannedFunction(CannedObject):
87 88
88 89 def __init__(self, f):
89 90 self._check_type(f)
90 91 self.code = f.func_code
91 92 if f.func_defaults:
92 93 self.defaults = [ can(fd) for fd in f.func_defaults ]
93 94 else:
94 95 self.defaults = None
95 96 self.module = f.__module__ or '__main__'
96 97 self.__name__ = f.__name__
97 98 self.buffers = []
98 99
99 100 def _check_type(self, obj):
100 101 assert isinstance(obj, FunctionType), "Not a function type"
101 102
102 103 def get_object(self, g=None):
103 104 # try to load function back into its module:
104 105 if not self.module.startswith('__'):
105 106 __import__(self.module)
106 107 g = sys.modules[self.module].__dict__
107 108
108 109 if g is None:
109 110 g = {}
110 111 if self.defaults:
111 112 defaults = tuple(uncan(cfd, g) for cfd in self.defaults)
112 113 else:
113 114 defaults = None
114 115 newFunc = FunctionType(self.code, g, self.__name__, defaults)
115 116 return newFunc
116 117
117 118 class CannedClass(CannedObject):
118 119
119 120 def __init__(self, cls):
120 121 self._check_type(cls)
121 122 self.name = cls.__name__
122 123 self.old_style = not isinstance(cls, type)
123 124 self._canned_dict = {}
124 125 for k,v in cls.__dict__.items():
125 126 if k not in ('__weakref__', '__dict__'):
126 127 self._canned_dict[k] = can(v)
127 128 if self.old_style:
128 129 mro = []
129 130 else:
130 131 mro = cls.mro()
131 132
132 133 self.parents = [ can(c) for c in mro[1:] ]
133 134 self.buffers = []
134 135
135 136 def _check_type(self, obj):
136 137 assert isinstance(obj, class_type), "Not a class type"
137 138
138 139 def get_object(self, g=None):
139 140 parents = tuple(uncan(p, g) for p in self.parents)
140 141 return type(self.name, parents, uncan_dict(self._canned_dict, g=g))
141 142
142 143 class CannedArray(CannedObject):
143 144 def __init__(self, obj):
144 145 self.shape = obj.shape
145 146 self.dtype = obj.dtype.descr if obj.dtype.fields else obj.dtype.str
146 147 if sum(obj.shape) == 0:
147 148 # just pickle it
148 149 self.buffers = [pickle.dumps(obj, -1)]
149 150 else:
150 151 # ensure contiguous
151 152 obj = numpy.ascontiguousarray(obj, dtype=None)
152 153 self.buffers = [buffer(obj)]
153 154
154 155 def get_object(self, g=None):
155 156 data = self.buffers[0]
156 157 if sum(self.shape) == 0:
157 158 # no shape, we just pickled it
158 159 return pickle.loads(data)
159 160 else:
160 161 return numpy.frombuffer(data, dtype=self.dtype).reshape(self.shape)
161 162
162 163
163 164 class CannedBytes(CannedObject):
164 165 wrap = bytes
165 166 def __init__(self, obj):
166 167 self.buffers = [obj]
167 168
168 169 def get_object(self, g=None):
169 170 data = self.buffers[0]
170 171 return self.wrap(data)
171 172
172 173 def CannedBuffer(CannedBytes):
173 174 wrap = buffer
174 175
175 176 #-------------------------------------------------------------------------------
176 177 # Functions
177 178 #-------------------------------------------------------------------------------
178 179
179 180 def _logger():
180 181 """get the logger for the current Application
181 182
182 183 the root logger will be used if no Application is running
183 184 """
184 185 if Application.initialized():
185 186 logger = Application.instance().log
186 187 else:
187 188 logger = logging.getLogger()
188 189 if not logger.handlers:
189 190 logging.basicConfig()
190 191
191 192 return logger
192 193
193 194 def _import_mapping(mapping, original=None):
194 195 """import any string-keys in a type mapping
195 196
196 197 """
197 198 log = _logger()
198 199 log.debug("Importing canning map")
199 200 for key,value in mapping.items():
200 201 if isinstance(key, basestring):
201 202 try:
202 203 cls = import_item(key)
203 204 except Exception:
204 205 if original and key not in original:
205 206 # only message on user-added classes
206 207 log.error("cannning class not importable: %r", key, exc_info=True)
207 208 mapping.pop(key)
208 209 else:
209 210 mapping[cls] = mapping.pop(key)
210 211
211 212 def istype(obj, check):
212 213 """like isinstance(obj, check), but strict
213 214
214 215 This won't catch subclasses.
215 216 """
216 217 if isinstance(check, tuple):
217 218 for cls in check:
218 219 if type(obj) is cls:
219 220 return True
220 221 return False
221 222 else:
222 223 return type(obj) is check
223 224
224 225 def can(obj):
225 226 """prepare an object for pickling"""
226 227
227 228 import_needed = False
228 229
229 230 for cls,canner in can_map.iteritems():
230 231 if isinstance(cls, basestring):
231 232 import_needed = True
232 233 break
233 234 elif istype(obj, cls):
234 235 return canner(obj)
235 236
236 237 if import_needed:
237 238 # perform can_map imports, then try again
238 239 # this will usually only happen once
239 240 _import_mapping(can_map, _original_can_map)
240 241 return can(obj)
241 242
242 243 return obj
243 244
244 245 def can_class(obj):
245 246 if isinstance(obj, class_type) and obj.__module__ == '__main__':
246 247 return CannedClass(obj)
247 248 else:
248 249 return obj
249 250
250 251 def can_dict(obj):
251 252 """can the *values* of a dict"""
252 if isinstance(obj, dict):
253 if istype(obj, dict):
253 254 newobj = {}
254 255 for k, v in obj.iteritems():
255 256 newobj[k] = can(v)
256 257 return newobj
257 258 else:
258 259 return obj
259 260
261 sequence_types = (list, tuple, set)
262
260 263 def can_sequence(obj):
261 264 """can the elements of a sequence"""
262 if isinstance(obj, (list, tuple)):
265 if istype(obj, sequence_types):
263 266 t = type(obj)
264 267 return t([can(i) for i in obj])
265 268 else:
266 269 return obj
267 270
268 271 def uncan(obj, g=None):
269 272 """invert canning"""
270 273
271 274 import_needed = False
272 275 for cls,uncanner in uncan_map.iteritems():
273 276 if isinstance(cls, basestring):
274 277 import_needed = True
275 278 break
276 279 elif isinstance(obj, cls):
277 280 return uncanner(obj, g)
278 281
279 282 if import_needed:
280 283 # perform uncan_map imports, then try again
281 284 # this will usually only happen once
282 285 _import_mapping(uncan_map, _original_uncan_map)
283 286 return uncan(obj, g)
284 287
285 288 return obj
286 289
287 290 def uncan_dict(obj, g=None):
288 if isinstance(obj, dict):
291 if istype(obj, dict):
289 292 newobj = {}
290 293 for k, v in obj.iteritems():
291 294 newobj[k] = uncan(v,g)
292 295 return newobj
293 296 else:
294 297 return obj
295 298
296 299 def uncan_sequence(obj, g=None):
297 if isinstance(obj, (list, tuple)):
300 if istype(obj, sequence_types):
298 301 t = type(obj)
299 302 return t([uncan(i,g) for i in obj])
300 303 else:
301 304 return obj
302 305
303 306
304 307 #-------------------------------------------------------------------------------
305 308 # API dictionaries
306 309 #-------------------------------------------------------------------------------
307 310
308 311 # These dicts can be extended for custom serialization of new objects
309 312
310 313 can_map = {
311 314 'IPython.parallel.dependent' : lambda obj: CannedObject(obj, keys=('f','df')),
312 315 'numpy.ndarray' : CannedArray,
313 316 FunctionType : CannedFunction,
314 317 bytes : CannedBytes,
315 318 buffer : CannedBuffer,
316 319 class_type : can_class,
317 320 }
318 321
319 322 uncan_map = {
320 323 CannedObject : lambda obj, g: obj.get_object(g),
321 324 }
322 325
323 326 # for use in _import_mapping:
324 327 _original_can_map = can_map.copy()
325 328 _original_uncan_map = uncan_map.copy()
General Comments 0
You need to be logged in to leave comments. Login now