##// END OF EJS Templates
Merge pull request #3041 from minrk/requirepush...
Brian E. Granger -
r10186:4be107e7 merge
parent child Browse files
Show More
@@ -5,7 +5,7 b' Authors:'
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2013 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
@@ -17,6 +17,7 b' from IPython.parallel.client.asyncresult import AsyncResult'
17 from IPython.parallel.error import UnmetDependency
17 from IPython.parallel.error import UnmetDependency
18 from IPython.parallel.util import interactive
18 from IPython.parallel.util import interactive
19 from IPython.utils import py3compat
19 from IPython.utils import py3compat
20 from IPython.utils.pickleutil import can, uncan
20
21
21 class depend(object):
22 class depend(object):
22 """Dependency decorator, for use with tasks.
23 """Dependency decorator, for use with tasks.
@@ -58,12 +59,12 b' class dependent(object):'
58 self.df = df
59 self.df = df
59 self.dargs = dargs
60 self.dargs = dargs
60 self.dkwargs = dkwargs
61 self.dkwargs = dkwargs
61
62
62 def __call__(self, *args, **kwargs):
63 def check_dependency(self):
63 # if hasattr(self.f, 'func_globals') and hasattr(self.df, 'func_globals'):
64 # self.df.func_globals = self.f.func_globals
65 if self.df(*self.dargs, **self.dkwargs) is False:
64 if self.df(*self.dargs, **self.dkwargs) is False:
66 raise UnmetDependency()
65 raise UnmetDependency()
66
67 def __call__(self, *args, **kwargs):
67 return self.f(*args, **kwargs)
68 return self.f(*args, **kwargs)
68
69
69 if not py3compat.PY3:
70 if not py3compat.PY3:
@@ -72,41 +73,62 b' class dependent(object):'
72 return self.func_name
73 return self.func_name
73
74
74 @interactive
75 @interactive
75 def _require(*names):
76 def _require(*modules, **mapping):
76 """Helper for @require decorator."""
77 """Helper for @require decorator."""
77 from IPython.parallel.error import UnmetDependency
78 from IPython.parallel.error import UnmetDependency
79 from IPython.utils.pickleutil import uncan
78 user_ns = globals()
80 user_ns = globals()
79 for name in names:
81 for name in modules:
80 if name in user_ns:
81 continue
82 try:
82 try:
83 exec 'import %s'%name in user_ns
83 exec 'import %s' % name in user_ns
84 except ImportError:
84 except ImportError:
85 raise UnmetDependency(name)
85 raise UnmetDependency(name)
86
87 for name, cobj in mapping.items():
88 user_ns[name] = uncan(cobj, user_ns)
86 return True
89 return True
87
90
88 def require(*mods):
91 def require(*objects, **mapping):
89 """Simple decorator for requiring names to be importable.
92 """Simple decorator for requiring local objects and modules to be available
93 when the decorated function is called on the engine.
94
95 Modules specified by name or passed directly will be imported
96 prior to calling the decorated function.
97
98 Objects other than modules will be pushed as a part of the task.
99 Functions can be passed positionally,
100 and will be pushed to the engine with their __name__.
101 Other objects can be passed by keyword arg.
90
102
91 Examples
103 Examples
92 --------
104 --------
93
105
94 In [1]: @require('numpy')
106 In [1]: @require('numpy')
95 ...: def norm(a):
107 ...: def norm(a):
96 ...: import numpy
97 ...: return numpy.linalg.norm(a,2)
108 ...: return numpy.linalg.norm(a,2)
109
110 In [2]: foo = lambda x: x*x
111 In [3]: @require(foo)
112 ...: def bar(a):
113 ...: return foo(1-a)
98 """
114 """
99 names = []
115 names = []
100 for mod in mods:
116 for obj in objects:
101 if isinstance(mod, ModuleType):
117 if isinstance(obj, ModuleType):
102 mod = mod.__name__
118 obj = obj.__name__
103
119
104 if isinstance(mod, basestring):
120 if isinstance(obj, basestring):
105 names.append(mod)
121 names.append(obj)
122 elif hasattr(obj, '__name__'):
123 mapping[obj.__name__] = obj
106 else:
124 else:
107 raise TypeError("names must be modules or module names, not %s"%type(mod))
125 raise TypeError("Objects other than modules and functions "
126 "must be passed by kwarg, but got: %s" % type(obj)
127 )
108
128
109 return depend(_require, *names)
129 for name, obj in mapping.items():
130 mapping[name] = can(obj)
131 return depend(_require, *names, **mapping)
110
132
111 class Dependency(set):
133 class Dependency(set):
112 """An object for representing a set of msg_id dependencies.
134 """An object for representing a set of msg_id dependencies.
@@ -37,6 +37,10 b' def wait(n):'
37 time.sleep(n)
37 time.sleep(n)
38 return n
38 return n
39
39
40 @pmod.interactive
41 def func(x):
42 return x*x
43
40 mixed = map(str, range(10))
44 mixed = map(str, range(10))
41 completed = map(str, range(0,10,2))
45 completed = map(str, range(0,10,2))
42 failed = map(str, range(1,10,2))
46 failed = map(str, range(1,10,2))
@@ -104,3 +108,29 b' class DependencyTest(ClusterTestCase):'
104 dep.all=False
108 dep.all=False
105 self.assertUnmet(dep)
109 self.assertUnmet(dep)
106 self.assertUnreachable(dep)
110 self.assertUnreachable(dep)
111
112 def test_require_function(self):
113
114 @pmod.interactive
115 def bar(a):
116 return func(a)
117
118 @pmod.require(func)
119 @pmod.interactive
120 def bar2(a):
121 return func(a)
122
123 self.client[:].clear()
124 self.assertRaisesRemote(NameError, self.view.apply_sync, bar, 5)
125 ar = self.view.apply_async(bar2, 5)
126 self.assertEqual(ar.get(5), func(5))
127
128 def test_require_object(self):
129
130 @pmod.require(foo=func)
131 @pmod.interactive
132 def bar(a):
133 return foo(a)
134
135 ar = self.view.apply_async(bar, 5)
136 self.assertEqual(ar.get(5), func(5))
@@ -49,9 +49,26 b' else:'
49
49
50
50
51 class CannedObject(object):
51 class CannedObject(object):
52 def __init__(self, obj, keys=[]):
52 def __init__(self, obj, keys=[], hook=None):
53 """can an object for safe pickling
54
55 Parameters
56 ==========
57
58 obj:
59 The object to be canned
60 keys: list (optional)
61 list of attribute names that will be explicitly canned / uncanned
62 hook: callable (optional)
63 An optional extra callable,
64 which can do additional processing of the uncanned object.
65
66 large data may be offloaded into the buffers list,
67 used for zero-copy transfers.
68 """
53 self.keys = keys
69 self.keys = keys
54 self.obj = copy.copy(obj)
70 self.obj = copy.copy(obj)
71 self.hook = can(hook)
55 for key in keys:
72 for key in keys:
56 setattr(self.obj, key, can(getattr(obj, key)))
73 setattr(self.obj, key, can(getattr(obj, key)))
57
74
@@ -60,8 +77,13 b' class CannedObject(object):'
60 def get_object(self, g=None):
77 def get_object(self, g=None):
61 if g is None:
78 if g is None:
62 g = {}
79 g = {}
80 obj = self.obj
63 for key in self.keys:
81 for key in self.keys:
64 setattr(self.obj, key, uncan(getattr(self.obj, key), g))
82 setattr(obj, key, uncan(getattr(obj, key), g))
83
84 if self.hook:
85 self.hook = uncan(self.hook, g)
86 self.hook(obj, g)
65 return self.obj
87 return self.obj
66
88
67
89
@@ -302,6 +324,11 b' def uncan_sequence(obj, g=None):'
302 else:
324 else:
303 return obj
325 return obj
304
326
327 def _uncan_dependent_hook(dep, g=None):
328 dep.check_dependency()
329
330 def can_dependent(obj):
331 return CannedObject(obj, keys=('f', 'df'), hook=_uncan_dependent_hook)
305
332
306 #-------------------------------------------------------------------------------
333 #-------------------------------------------------------------------------------
307 # API dictionaries
334 # API dictionaries
@@ -310,7 +337,7 b' def uncan_sequence(obj, g=None):'
310 # These dicts can be extended for custom serialization of new objects
337 # These dicts can be extended for custom serialization of new objects
311
338
312 can_map = {
339 can_map = {
313 'IPython.parallel.dependent' : lambda obj: CannedObject(obj, keys=('f','df')),
340 'IPython.parallel.dependent' : can_dependent,
314 'numpy.ndarray' : CannedArray,
341 'numpy.ndarray' : CannedArray,
315 FunctionType : CannedFunction,
342 FunctionType : CannedFunction,
316 bytes : CannedBytes,
343 bytes : CannedBytes,
General Comments 0
You need to be logged in to leave comments. Login now