##// END OF EJS Templates
use canning hook in dependent...
MinRK -
Show More
@@ -1,223 +1,225 b''
1 1 """Dependency utilities
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2013 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 from types import ModuleType
15 15
16 16 from IPython.parallel.client.asyncresult import AsyncResult
17 17 from IPython.parallel.error import UnmetDependency
18 18 from IPython.parallel.util import interactive
19 19 from IPython.utils import py3compat
20 20 from IPython.utils.pickleutil import can, uncan
21 21
22 22 class depend(object):
23 23 """Dependency decorator, for use with tasks.
24 24
25 25 `@depend` lets you define a function for engine dependencies
26 26 just like you use `apply` for tasks.
27 27
28 28
29 29 Examples
30 30 --------
31 31 ::
32 32
33 33 @depend(df, a,b, c=5)
34 34 def f(m,n,p)
35 35
36 36 view.apply(f, 1,2,3)
37 37
38 38 will call df(a,b,c=5) on the engine, and if it returns False or
39 39 raises an UnmetDependency error, then the task will not be run
40 40 and another engine will be tried.
41 41 """
42 42 def __init__(self, f, *args, **kwargs):
43 43 self.f = f
44 44 self.args = args
45 45 self.kwargs = kwargs
46 46
47 47 def __call__(self, f):
48 48 return dependent(f, self.f, *self.args, **self.kwargs)
49 49
50 50 class dependent(object):
51 51 """A function that depends on another function.
52 52 This is an object to prevent the closure used
53 53 in traditional decorators, which are not picklable.
54 54 """
55 55
56 56 def __init__(self, f, df, *dargs, **dkwargs):
57 57 self.f = f
58 58 self.func_name = getattr(f, '__name__', 'f')
59 59 self.df = df
60 60 self.dargs = dargs
61 61 self.dkwargs = dkwargs
62
63 def __call__(self, *args, **kwargs):
62
63 def check_dependency(self):
64 64 if self.df(*self.dargs, **self.dkwargs) is False:
65 65 raise UnmetDependency()
66
67 def __call__(self, *args, **kwargs):
66 68 return self.f(*args, **kwargs)
67 69
68 70 if not py3compat.PY3:
69 71 @property
70 72 def __name__(self):
71 73 return self.func_name
72 74
73 75 @interactive
74 76 def _require(*modules, **mapping):
75 77 """Helper for @require decorator."""
76 78 from IPython.parallel.error import UnmetDependency
77 79 from IPython.utils.pickleutil import uncan
78 80 user_ns = globals()
79 81 for name in modules:
80 82 try:
81 83 exec 'import %s' % name in user_ns
82 84 except ImportError:
83 85 raise UnmetDependency(name)
84 86
85 87 for name, cobj in mapping.items():
86 88 user_ns[name] = uncan(cobj, user_ns)
87 89 return True
88 90
89 91 def require(*objects, **mapping):
90 92 """Simple decorator for requiring local objects and modules to be available
91 93 when the decorated function is called on the engine.
92 94
93 95 Modules specified by name or passed directly will be imported
94 96 prior to calling the decorated function.
95 97
96 98 Objects other than modules will be pushed as a part of the task.
97 99 Functions can be passed positionally,
98 100 and will be pushed to the engine with their __name__.
99 101 Other objects can be passed by keyword arg.
100 102
101 103 Examples
102 104 --------
103 105
104 106 In [1]: @require('numpy')
105 107 ...: def norm(a):
106 108 ...: return numpy.linalg.norm(a,2)
107 109
108 110 In [2]: foo = lambda x: x*x
109 111 In [3]: @require(foo)
110 112 ...: def bar(a):
111 113 ...: return foo(1-a)
112 114 """
113 115 names = []
114 116 for obj in objects:
115 117 if isinstance(obj, ModuleType):
116 118 obj = obj.__name__
117 119
118 120 if isinstance(obj, basestring):
119 121 names.append(obj)
120 122 elif hasattr(obj, '__name__'):
121 123 mapping[obj.__name__] = obj
122 124 else:
123 125 raise TypeError("Objects other than modules and functions "
124 126 "must be passed by kwarg, but got: %s" % type(obj)
125 127 )
126 128
127 129 for name, obj in mapping.items():
128 130 mapping[name] = can(obj)
129 131 return depend(_require, *names, **mapping)
130 132
131 133 class Dependency(set):
132 134 """An object for representing a set of msg_id dependencies.
133 135
134 136 Subclassed from set().
135 137
136 138 Parameters
137 139 ----------
138 140 dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
139 141 The msg_ids to depend on
140 142 all : bool [default True]
141 143 Whether the dependency should be considered met when *all* depending tasks have completed
142 144 or only when *any* have been completed.
143 145 success : bool [default True]
144 146 Whether to consider successes as fulfilling dependencies.
145 147 failure : bool [default False]
146 148 Whether to consider failures as fulfilling dependencies.
147 149
148 150 If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency
149 151 as soon as the first depended-upon task fails.
150 152 """
151 153
152 154 all=True
153 155 success=True
154 156 failure=True
155 157
156 158 def __init__(self, dependencies=[], all=True, success=True, failure=False):
157 159 if isinstance(dependencies, dict):
158 160 # load from dict
159 161 all = dependencies.get('all', True)
160 162 success = dependencies.get('success', success)
161 163 failure = dependencies.get('failure', failure)
162 164 dependencies = dependencies.get('dependencies', [])
163 165 ids = []
164 166
165 167 # extract ids from various sources:
166 168 if isinstance(dependencies, (basestring, AsyncResult)):
167 169 dependencies = [dependencies]
168 170 for d in dependencies:
169 171 if isinstance(d, basestring):
170 172 ids.append(d)
171 173 elif isinstance(d, AsyncResult):
172 174 ids.extend(d.msg_ids)
173 175 else:
174 176 raise TypeError("invalid dependency type: %r"%type(d))
175 177
176 178 set.__init__(self, ids)
177 179 self.all = all
178 180 if not (success or failure):
179 181 raise ValueError("Must depend on at least one of successes or failures!")
180 182 self.success=success
181 183 self.failure = failure
182 184
183 185 def check(self, completed, failed=None):
184 186 """check whether our dependencies have been met."""
185 187 if len(self) == 0:
186 188 return True
187 189 against = set()
188 190 if self.success:
189 191 against = completed
190 192 if failed is not None and self.failure:
191 193 against = against.union(failed)
192 194 if self.all:
193 195 return self.issubset(against)
194 196 else:
195 197 return not self.isdisjoint(against)
196 198
197 199 def unreachable(self, completed, failed=None):
198 200 """return whether this dependency has become impossible."""
199 201 if len(self) == 0:
200 202 return False
201 203 against = set()
202 204 if not self.success:
203 205 against = completed
204 206 if failed is not None and not self.failure:
205 207 against = against.union(failed)
206 208 if self.all:
207 209 return not self.isdisjoint(against)
208 210 else:
209 211 return self.issubset(against)
210 212
211 213
212 214 def as_dict(self):
213 215 """Represent this dependency as a dict. For json compatibility."""
214 216 return dict(
215 217 dependencies=list(self),
216 218 all=self.all,
217 219 success=self.success,
218 220 failure=self.failure
219 221 )
220 222
221 223
222 224 __all__ = ['depend', 'require', 'dependent', 'Dependency']
223 225
@@ -1,333 +1,338 b''
1 1 # encoding: utf-8
2 2
3 3 """Pickle related utilities. Perhaps this should be called 'can'."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import sys
21 21 from types import FunctionType
22 22
23 23 try:
24 24 import cPickle as pickle
25 25 except ImportError:
26 26 import pickle
27 27
28 28 try:
29 29 import numpy
30 30 except:
31 31 numpy = None
32 32
33 33 import codeutil
34 34 import py3compat
35 35 from importstring import import_item
36 36
37 37 from IPython.config import Application
38 38
39 39 if py3compat.PY3:
40 40 buffer = memoryview
41 41 class_type = type
42 42 else:
43 43 from types import ClassType
44 44 class_type = (type, ClassType)
45 45
46 46 #-------------------------------------------------------------------------------
47 47 # Classes
48 48 #-------------------------------------------------------------------------------
49 49
50 50
51 51 class CannedObject(object):
52 52 def __init__(self, obj, keys=[], hook=None):
53 53 self.keys = keys
54 54 self.obj = copy.copy(obj)
55 55 self.hook = can(hook)
56 56 for key in keys:
57 57 setattr(self.obj, key, can(getattr(obj, key)))
58 58
59 59 self.buffers = []
60 60
61 61 def get_object(self, g=None):
62 62 if g is None:
63 63 g = {}
64 64 obj = self.obj
65 65 for key in self.keys:
66 66 setattr(obj, key, uncan(getattr(obj, key), g))
67
67
68 68 if self.hook:
69 69 self.hook = uncan(self.hook, g)
70 70 self.hook(obj, g)
71 71 return self.obj
72 72
73 73
74 74 class Reference(CannedObject):
75 75 """object for wrapping a remote reference by name."""
76 76 def __init__(self, name):
77 77 if not isinstance(name, basestring):
78 78 raise TypeError("illegal name: %r"%name)
79 79 self.name = name
80 80 self.buffers = []
81 81
82 82 def __repr__(self):
83 83 return "<Reference: %r>"%self.name
84 84
85 85 def get_object(self, g=None):
86 86 if g is None:
87 87 g = {}
88 88
89 89 return eval(self.name, g)
90 90
91 91
92 92 class CannedFunction(CannedObject):
93 93
94 94 def __init__(self, f):
95 95 self._check_type(f)
96 96 self.code = f.func_code
97 97 if f.func_defaults:
98 98 self.defaults = [ can(fd) for fd in f.func_defaults ]
99 99 else:
100 100 self.defaults = None
101 101 self.module = f.__module__ or '__main__'
102 102 self.__name__ = f.__name__
103 103 self.buffers = []
104 104
105 105 def _check_type(self, obj):
106 106 assert isinstance(obj, FunctionType), "Not a function type"
107 107
108 108 def get_object(self, g=None):
109 109 # try to load function back into its module:
110 110 if not self.module.startswith('__'):
111 111 __import__(self.module)
112 112 g = sys.modules[self.module].__dict__
113 113
114 114 if g is None:
115 115 g = {}
116 116 if self.defaults:
117 117 defaults = tuple(uncan(cfd, g) for cfd in self.defaults)
118 118 else:
119 119 defaults = None
120 120 newFunc = FunctionType(self.code, g, self.__name__, defaults)
121 121 return newFunc
122 122
123 123 class CannedClass(CannedObject):
124 124
125 125 def __init__(self, cls):
126 126 self._check_type(cls)
127 127 self.name = cls.__name__
128 128 self.old_style = not isinstance(cls, type)
129 129 self._canned_dict = {}
130 130 for k,v in cls.__dict__.items():
131 131 if k not in ('__weakref__', '__dict__'):
132 132 self._canned_dict[k] = can(v)
133 133 if self.old_style:
134 134 mro = []
135 135 else:
136 136 mro = cls.mro()
137 137
138 138 self.parents = [ can(c) for c in mro[1:] ]
139 139 self.buffers = []
140 140
141 141 def _check_type(self, obj):
142 142 assert isinstance(obj, class_type), "Not a class type"
143 143
144 144 def get_object(self, g=None):
145 145 parents = tuple(uncan(p, g) for p in self.parents)
146 146 return type(self.name, parents, uncan_dict(self._canned_dict, g=g))
147 147
148 148 class CannedArray(CannedObject):
149 149 def __init__(self, obj):
150 150 self.shape = obj.shape
151 151 self.dtype = obj.dtype.descr if obj.dtype.fields else obj.dtype.str
152 152 if sum(obj.shape) == 0:
153 153 # just pickle it
154 154 self.buffers = [pickle.dumps(obj, -1)]
155 155 else:
156 156 # ensure contiguous
157 157 obj = numpy.ascontiguousarray(obj, dtype=None)
158 158 self.buffers = [buffer(obj)]
159 159
160 160 def get_object(self, g=None):
161 161 data = self.buffers[0]
162 162 if sum(self.shape) == 0:
163 163 # no shape, we just pickled it
164 164 return pickle.loads(data)
165 165 else:
166 166 return numpy.frombuffer(data, dtype=self.dtype).reshape(self.shape)
167 167
168 168
169 169 class CannedBytes(CannedObject):
170 170 wrap = bytes
171 171 def __init__(self, obj):
172 172 self.buffers = [obj]
173 173
174 174 def get_object(self, g=None):
175 175 data = self.buffers[0]
176 176 return self.wrap(data)
177 177
178 178 def CannedBuffer(CannedBytes):
179 179 wrap = buffer
180 180
181 181 #-------------------------------------------------------------------------------
182 182 # Functions
183 183 #-------------------------------------------------------------------------------
184 184
185 185 def _logger():
186 186 """get the logger for the current Application
187 187
188 188 the root logger will be used if no Application is running
189 189 """
190 190 if Application.initialized():
191 191 logger = Application.instance().log
192 192 else:
193 193 logger = logging.getLogger()
194 194 if not logger.handlers:
195 195 logging.basicConfig()
196 196
197 197 return logger
198 198
199 199 def _import_mapping(mapping, original=None):
200 200 """import any string-keys in a type mapping
201 201
202 202 """
203 203 log = _logger()
204 204 log.debug("Importing canning map")
205 205 for key,value in mapping.items():
206 206 if isinstance(key, basestring):
207 207 try:
208 208 cls = import_item(key)
209 209 except Exception:
210 210 if original and key not in original:
211 211 # only message on user-added classes
212 212 log.error("cannning class not importable: %r", key, exc_info=True)
213 213 mapping.pop(key)
214 214 else:
215 215 mapping[cls] = mapping.pop(key)
216 216
217 217 def istype(obj, check):
218 218 """like isinstance(obj, check), but strict
219 219
220 220 This won't catch subclasses.
221 221 """
222 222 if isinstance(check, tuple):
223 223 for cls in check:
224 224 if type(obj) is cls:
225 225 return True
226 226 return False
227 227 else:
228 228 return type(obj) is check
229 229
230 230 def can(obj):
231 231 """prepare an object for pickling"""
232 232
233 233 import_needed = False
234 234
235 235 for cls,canner in can_map.iteritems():
236 236 if isinstance(cls, basestring):
237 237 import_needed = True
238 238 break
239 239 elif istype(obj, cls):
240 240 return canner(obj)
241 241
242 242 if import_needed:
243 243 # perform can_map imports, then try again
244 244 # this will usually only happen once
245 245 _import_mapping(can_map, _original_can_map)
246 246 return can(obj)
247 247
248 248 return obj
249 249
250 250 def can_class(obj):
251 251 if isinstance(obj, class_type) and obj.__module__ == '__main__':
252 252 return CannedClass(obj)
253 253 else:
254 254 return obj
255 255
256 256 def can_dict(obj):
257 257 """can the *values* of a dict"""
258 258 if istype(obj, dict):
259 259 newobj = {}
260 260 for k, v in obj.iteritems():
261 261 newobj[k] = can(v)
262 262 return newobj
263 263 else:
264 264 return obj
265 265
266 266 sequence_types = (list, tuple, set)
267 267
268 268 def can_sequence(obj):
269 269 """can the elements of a sequence"""
270 270 if istype(obj, sequence_types):
271 271 t = type(obj)
272 272 return t([can(i) for i in obj])
273 273 else:
274 274 return obj
275 275
276 276 def uncan(obj, g=None):
277 277 """invert canning"""
278 278
279 279 import_needed = False
280 280 for cls,uncanner in uncan_map.iteritems():
281 281 if isinstance(cls, basestring):
282 282 import_needed = True
283 283 break
284 284 elif isinstance(obj, cls):
285 285 return uncanner(obj, g)
286 286
287 287 if import_needed:
288 288 # perform uncan_map imports, then try again
289 289 # this will usually only happen once
290 290 _import_mapping(uncan_map, _original_uncan_map)
291 291 return uncan(obj, g)
292 292
293 293 return obj
294 294
295 295 def uncan_dict(obj, g=None):
296 296 if istype(obj, dict):
297 297 newobj = {}
298 298 for k, v in obj.iteritems():
299 299 newobj[k] = uncan(v,g)
300 300 return newobj
301 301 else:
302 302 return obj
303 303
304 304 def uncan_sequence(obj, g=None):
305 305 if istype(obj, sequence_types):
306 306 t = type(obj)
307 307 return t([uncan(i,g) for i in obj])
308 308 else:
309 309 return obj
310 310
311 def _uncan_dependent_hook(dep, g=None):
312 dep.check_dependency()
313
314 def can_dependent(obj):
315 return CannedObject(obj, keys=('f', 'df'), hook=_uncan_dependent_hook)
311 316
312 317 #-------------------------------------------------------------------------------
313 318 # API dictionaries
314 319 #-------------------------------------------------------------------------------
315 320
316 321 # These dicts can be extended for custom serialization of new objects
317 322
318 323 can_map = {
319 'IPython.parallel.dependent' : lambda obj: CannedObject(obj, keys=('f','df')),
324 'IPython.parallel.dependent' : can_dependent,
320 325 'numpy.ndarray' : CannedArray,
321 326 FunctionType : CannedFunction,
322 327 bytes : CannedBytes,
323 328 buffer : CannedBuffer,
324 329 class_type : can_class,
325 330 }
326 331
327 332 uncan_map = {
328 333 CannedObject : lambda obj, g: obj.get_object(g),
329 334 }
330 335
331 336 # for use in _import_mapping:
332 337 _original_can_map = can_map.copy()
333 338 _original_uncan_map = uncan_map.copy()
General Comments 0
You need to be logged in to leave comments. Login now