Show More
@@ -5,7 +5,7 b' Authors:' | |||||
5 | * Min RK |
|
5 | * Min RK | |
6 | """ |
|
6 | """ | |
7 | #----------------------------------------------------------------------------- |
|
7 | #----------------------------------------------------------------------------- | |
8 |
# Copyright (C) 201 |
|
8 | # Copyright (C) 2013 The IPython Development Team | |
9 | # |
|
9 | # | |
10 | # Distributed under the terms of the BSD License. The full license is in |
|
10 | # Distributed under the terms of the BSD License. The full license is in | |
11 | # the file COPYING, distributed as part of this software. |
|
11 | # the file COPYING, distributed as part of this software. | |
@@ -17,6 +17,7 b' from IPython.parallel.client.asyncresult import AsyncResult' | |||||
17 | from IPython.parallel.error import UnmetDependency |
|
17 | from IPython.parallel.error import UnmetDependency | |
18 | from IPython.parallel.util import interactive |
|
18 | from IPython.parallel.util import interactive | |
19 | from IPython.utils import py3compat |
|
19 | from IPython.utils import py3compat | |
|
20 | from IPython.utils.pickleutil import can, uncan | |||
20 |
|
21 | |||
21 | class depend(object): |
|
22 | class depend(object): | |
22 | """Dependency decorator, for use with tasks. |
|
23 | """Dependency decorator, for use with tasks. | |
@@ -58,12 +59,12 b' class dependent(object):' | |||||
58 | self.df = df |
|
59 | self.df = df | |
59 | self.dargs = dargs |
|
60 | self.dargs = dargs | |
60 | self.dkwargs = dkwargs |
|
61 | self.dkwargs = dkwargs | |
61 |
|
62 | |||
62 | def __call__(self, *args, **kwargs): |
|
63 | def check_dependency(self): | |
63 | # if hasattr(self.f, 'func_globals') and hasattr(self.df, 'func_globals'): |
|
|||
64 | # self.df.func_globals = self.f.func_globals |
|
|||
65 | if self.df(*self.dargs, **self.dkwargs) is False: |
|
64 | if self.df(*self.dargs, **self.dkwargs) is False: | |
66 | raise UnmetDependency() |
|
65 | raise UnmetDependency() | |
|
66 | ||||
|
67 | def __call__(self, *args, **kwargs): | |||
67 | return self.f(*args, **kwargs) |
|
68 | return self.f(*args, **kwargs) | |
68 |
|
69 | |||
69 | if not py3compat.PY3: |
|
70 | if not py3compat.PY3: | |
@@ -72,41 +73,62 b' class dependent(object):' | |||||
72 | return self.func_name |
|
73 | return self.func_name | |
73 |
|
74 | |||
74 | @interactive |
|
75 | @interactive | |
75 |
def _require(* |
|
76 | def _require(*modules, **mapping): | |
76 | """Helper for @require decorator.""" |
|
77 | """Helper for @require decorator.""" | |
77 | from IPython.parallel.error import UnmetDependency |
|
78 | from IPython.parallel.error import UnmetDependency | |
|
79 | from IPython.utils.pickleutil import uncan | |||
78 | user_ns = globals() |
|
80 | user_ns = globals() | |
79 |
for name in |
|
81 | for name in modules: | |
80 | if name in user_ns: |
|
|||
81 | continue |
|
|||
82 | try: |
|
82 | try: | |
83 | exec 'import %s'%name in user_ns |
|
83 | exec 'import %s' % name in user_ns | |
84 | except ImportError: |
|
84 | except ImportError: | |
85 | raise UnmetDependency(name) |
|
85 | raise UnmetDependency(name) | |
|
86 | ||||
|
87 | for name, cobj in mapping.items(): | |||
|
88 | user_ns[name] = uncan(cobj, user_ns) | |||
86 | return True |
|
89 | return True | |
87 |
|
90 | |||
88 |
def require(* |
|
91 | def require(*objects, **mapping): | |
89 |
"""Simple decorator for requiring |
|
92 | """Simple decorator for requiring local objects and modules to be available | |
|
93 | when the decorated function is called on the engine. | |||
|
94 | ||||
|
95 | Modules specified by name or passed directly will be imported | |||
|
96 | prior to calling the decorated function. | |||
|
97 | ||||
|
98 | Objects other than modules will be pushed as a part of the task. | |||
|
99 | Functions can be passed positionally, | |||
|
100 | and will be pushed to the engine with their __name__. | |||
|
101 | Other objects can be passed by keyword arg. | |||
90 |
|
102 | |||
91 | Examples |
|
103 | Examples | |
92 | -------- |
|
104 | -------- | |
93 |
|
105 | |||
94 | In [1]: @require('numpy') |
|
106 | In [1]: @require('numpy') | |
95 | ...: def norm(a): |
|
107 | ...: def norm(a): | |
96 | ...: import numpy |
|
|||
97 | ...: return numpy.linalg.norm(a,2) |
|
108 | ...: return numpy.linalg.norm(a,2) | |
|
109 | ||||
|
110 | In [2]: foo = lambda x: x*x | |||
|
111 | In [3]: @require(foo) | |||
|
112 | ...: def bar(a): | |||
|
113 | ...: return foo(1-a) | |||
98 | """ |
|
114 | """ | |
99 | names = [] |
|
115 | names = [] | |
100 |
for |
|
116 | for obj in objects: | |
101 |
if isinstance( |
|
117 | if isinstance(obj, ModuleType): | |
102 |
|
|
118 | obj = obj.__name__ | |
103 |
|
119 | |||
104 |
if isinstance( |
|
120 | if isinstance(obj, basestring): | |
105 |
names.append( |
|
121 | names.append(obj) | |
|
122 | elif hasattr(obj, '__name__'): | |||
|
123 | mapping[obj.__name__] = obj | |||
106 | else: |
|
124 | else: | |
107 | raise TypeError("names must be modules or module names, not %s"%type(mod)) |
|
125 | raise TypeError("Objects other than modules and functions " | |
|
126 | "must be passed by kwarg, but got: %s" % type(obj) | |||
|
127 | ) | |||
108 |
|
128 | |||
109 | return depend(_require, *names) |
|
129 | for name, obj in mapping.items(): | |
|
130 | mapping[name] = can(obj) | |||
|
131 | return depend(_require, *names, **mapping) | |||
110 |
|
132 | |||
111 | class Dependency(set): |
|
133 | class Dependency(set): | |
112 | """An object for representing a set of msg_id dependencies. |
|
134 | """An object for representing a set of msg_id dependencies. |
@@ -37,6 +37,10 b' def wait(n):' | |||||
37 | time.sleep(n) |
|
37 | time.sleep(n) | |
38 | return n |
|
38 | return n | |
39 |
|
39 | |||
|
40 | @pmod.interactive | |||
|
41 | def func(x): | |||
|
42 | return x*x | |||
|
43 | ||||
40 | mixed = map(str, range(10)) |
|
44 | mixed = map(str, range(10)) | |
41 | completed = map(str, range(0,10,2)) |
|
45 | completed = map(str, range(0,10,2)) | |
42 | failed = map(str, range(1,10,2)) |
|
46 | failed = map(str, range(1,10,2)) | |
@@ -104,3 +108,29 b' class DependencyTest(ClusterTestCase):' | |||||
104 | dep.all=False |
|
108 | dep.all=False | |
105 | self.assertUnmet(dep) |
|
109 | self.assertUnmet(dep) | |
106 | self.assertUnreachable(dep) |
|
110 | self.assertUnreachable(dep) | |
|
111 | ||||
|
112 | def test_require_function(self): | |||
|
113 | ||||
|
114 | @pmod.interactive | |||
|
115 | def bar(a): | |||
|
116 | return func(a) | |||
|
117 | ||||
|
118 | @pmod.require(func) | |||
|
119 | @pmod.interactive | |||
|
120 | def bar2(a): | |||
|
121 | return func(a) | |||
|
122 | ||||
|
123 | self.client[:].clear() | |||
|
124 | self.assertRaisesRemote(NameError, self.view.apply_sync, bar, 5) | |||
|
125 | ar = self.view.apply_async(bar2, 5) | |||
|
126 | self.assertEqual(ar.get(5), func(5)) | |||
|
127 | ||||
|
128 | def test_require_object(self): | |||
|
129 | ||||
|
130 | @pmod.require(foo=func) | |||
|
131 | @pmod.interactive | |||
|
132 | def bar(a): | |||
|
133 | return foo(a) | |||
|
134 | ||||
|
135 | ar = self.view.apply_async(bar, 5) | |||
|
136 | self.assertEqual(ar.get(5), func(5)) |
@@ -49,9 +49,26 b' else:' | |||||
49 |
|
49 | |||
50 |
|
50 | |||
51 | class CannedObject(object): |
|
51 | class CannedObject(object): | |
52 | def __init__(self, obj, keys=[]): |
|
52 | def __init__(self, obj, keys=[], hook=None): | |
|
53 | """can an object for safe pickling | |||
|
54 | ||||
|
55 | Parameters | |||
|
56 | ========== | |||
|
57 | ||||
|
58 | obj: | |||
|
59 | The object to be canned | |||
|
60 | keys: list (optional) | |||
|
61 | list of attribute names that will be explicitly canned / uncanned | |||
|
62 | hook: callable (optional) | |||
|
63 | An optional extra callable, | |||
|
64 | which can do additional processing of the uncanned object. | |||
|
65 | ||||
|
66 | large data may be offloaded into the buffers list, | |||
|
67 | used for zero-copy transfers. | |||
|
68 | """ | |||
53 | self.keys = keys |
|
69 | self.keys = keys | |
54 | self.obj = copy.copy(obj) |
|
70 | self.obj = copy.copy(obj) | |
|
71 | self.hook = can(hook) | |||
55 | for key in keys: |
|
72 | for key in keys: | |
56 | setattr(self.obj, key, can(getattr(obj, key))) |
|
73 | setattr(self.obj, key, can(getattr(obj, key))) | |
57 |
|
74 | |||
@@ -60,8 +77,13 b' class CannedObject(object):' | |||||
60 | def get_object(self, g=None): |
|
77 | def get_object(self, g=None): | |
61 | if g is None: |
|
78 | if g is None: | |
62 | g = {} |
|
79 | g = {} | |
|
80 | obj = self.obj | |||
63 | for key in self.keys: |
|
81 | for key in self.keys: | |
64 |
setattr( |
|
82 | setattr(obj, key, uncan(getattr(obj, key), g)) | |
|
83 | ||||
|
84 | if self.hook: | |||
|
85 | self.hook = uncan(self.hook, g) | |||
|
86 | self.hook(obj, g) | |||
65 | return self.obj |
|
87 | return self.obj | |
66 |
|
88 | |||
67 |
|
89 | |||
@@ -302,6 +324,11 b' def uncan_sequence(obj, g=None):' | |||||
302 | else: |
|
324 | else: | |
303 | return obj |
|
325 | return obj | |
304 |
|
326 | |||
|
327 | def _uncan_dependent_hook(dep, g=None): | |||
|
328 | dep.check_dependency() | |||
|
329 | ||||
|
330 | def can_dependent(obj): | |||
|
331 | return CannedObject(obj, keys=('f', 'df'), hook=_uncan_dependent_hook) | |||
305 |
|
332 | |||
306 | #------------------------------------------------------------------------------- |
|
333 | #------------------------------------------------------------------------------- | |
307 | # API dictionaries |
|
334 | # API dictionaries | |
@@ -310,7 +337,7 b' def uncan_sequence(obj, g=None):' | |||||
310 | # These dicts can be extended for custom serialization of new objects |
|
337 | # These dicts can be extended for custom serialization of new objects | |
311 |
|
338 | |||
312 | can_map = { |
|
339 | can_map = { | |
313 |
'IPython.parallel.dependent' : |
|
340 | 'IPython.parallel.dependent' : can_dependent, | |
314 | 'numpy.ndarray' : CannedArray, |
|
341 | 'numpy.ndarray' : CannedArray, | |
315 | FunctionType : CannedFunction, |
|
342 | FunctionType : CannedFunction, | |
316 | bytes : CannedBytes, |
|
343 | bytes : CannedBytes, |
General Comments 0
You need to be logged in to leave comments.
Login now