dependency.py
229 lines
| 7.2 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """Dependency utilities | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3660 | #----------------------------------------------------------------------------- | ||
MinRK
|
r9965 | # Copyright (C) 2013 The IPython Development Team | ||
MinRK
|
r3660 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3546 | |||
MinRK
|
r3665 | from types import ModuleType | ||
Min RK
|
r20860 | from ipython_parallel.client.asyncresult import AsyncResult | ||
from ipython_parallel.error import UnmetDependency | ||||
from ipython_parallel.util import interactive | ||||
Thomas Kluyver
|
r4735 | from IPython.utils import py3compat | ||
Thomas Kluyver
|
r13353 | from IPython.utils.py3compat import string_types | ||
Min RK
|
r21070 | from ipython_kernel.pickleutil import can, uncan | ||
MinRK
|
r3571 | |||
MinRK
|
r3546 | class depend(object): | ||
MinRK
|
r3644 | """Dependency decorator, for use with tasks. | ||
`@depend` lets you define a function for engine dependencies | ||||
just like you use `apply` for tasks. | ||||
Examples | ||||
-------- | ||||
:: | ||||
@depend(df, a,b, c=5) | ||||
def f(m,n,p) | ||||
view.apply(f, 1,2,3) | ||||
will call df(a,b,c=5) on the engine, and if it returns False or | ||||
raises an UnmetDependency error, then the task will not be run | ||||
and another engine will be tried. | ||||
""" | ||||
MinRK
|
r16420 | def __init__(self, _wrapped_f, *args, **kwargs): | ||
self.f = _wrapped_f | ||||
MinRK
|
r3546 | self.args = args | ||
self.kwargs = kwargs | ||||
def __call__(self, f): | ||||
return dependent(f, self.f, *self.args, **self.kwargs) | ||||
class dependent(object): | ||||
"""A function that depends on another function. | ||||
This is an object to prevent the closure used | ||||
in traditional decorators, which are not picklable. | ||||
""" | ||||
MinRK
|
r16420 | def __init__(self, _wrapped_f, _wrapped_df, *dargs, **dkwargs): | ||
self.f = _wrapped_f | ||||
Thomas Kluyver
|
r16442 | name = getattr(_wrapped_f, '__name__', 'f') | ||
Thomas Kluyver
|
r13383 | if py3compat.PY3: | ||
self.__name__ = name | ||||
else: | ||||
self.func_name = name | ||||
MinRK
|
r16420 | self.df = _wrapped_df | ||
MinRK
|
r3546 | self.dargs = dargs | ||
self.dkwargs = dkwargs | ||||
MinRK
|
r9986 | |||
def check_dependency(self): | ||||
MinRK
|
r3546 | if self.df(*self.dargs, **self.dkwargs) is False: | ||
raise UnmetDependency() | ||||
MinRK
|
r9986 | |||
def __call__(self, *args, **kwargs): | ||||
MinRK
|
r3546 | return self.f(*args, **kwargs) | ||
MinRK
|
r3607 | |||
Thomas Kluyver
|
r4735 | if not py3compat.PY3: | ||
@property | ||||
def __name__(self): | ||||
MinRK
|
r13400 | return self.func_name | ||
MinRK
|
r3546 | |||
MinRK
|
r3664 | @interactive | ||
MinRK
|
r9965 | def _require(*modules, **mapping): | ||
MinRK
|
r3644 | """Helper for @require decorator.""" | ||
Min RK
|
r20860 | from ipython_parallel.error import UnmetDependency | ||
Min RK
|
r21070 | from ipython_kernel.pickleutil import uncan | ||
MinRK
|
r3664 | user_ns = globals() | ||
MinRK
|
r9965 | for name in modules: | ||
MinRK
|
r3548 | try: | ||
Thomas Kluyver
|
r13350 | exec('import %s' % name, user_ns) | ||
MinRK
|
r3548 | except ImportError: | ||
MinRK
|
r3664 | raise UnmetDependency(name) | ||
MinRK
|
r9965 | |||
for name, cobj in mapping.items(): | ||||
user_ns[name] = uncan(cobj, user_ns) | ||||
MinRK
|
r3548 | return True | ||
MinRK
|
r3546 | |||
MinRK
|
r9965 | def require(*objects, **mapping): | ||
"""Simple decorator for requiring local objects and modules to be available | ||||
when the decorated function is called on the engine. | ||||
Modules specified by name or passed directly will be imported | ||||
prior to calling the decorated function. | ||||
Objects other than modules will be pushed as a part of the task. | ||||
Functions can be passed positionally, | ||||
and will be pushed to the engine with their __name__. | ||||
Other objects can be passed by keyword arg. | ||||
MinRK
|
r3644 | |||
Thomas Kluyver
|
r13595 | Examples:: | ||
MinRK
|
r3644 | |||
Thomas Kluyver
|
r13595 | In [1]: @require('numpy') | ||
...: def norm(a): | ||||
...: return numpy.linalg.norm(a,2) | ||||
MinRK
|
r9965 | |||
Thomas Kluyver
|
r13595 | In [2]: foo = lambda x: x*x | ||
In [3]: @require(foo) | ||||
...: def bar(a): | ||||
...: return foo(1-a) | ||||
MinRK
|
r3644 | """ | ||
MinRK
|
r3665 | names = [] | ||
MinRK
|
r9965 | for obj in objects: | ||
if isinstance(obj, ModuleType): | ||||
obj = obj.__name__ | ||||
MinRK
|
r3665 | |||
Thomas Kluyver
|
r13353 | if isinstance(obj, string_types): | ||
MinRK
|
r9965 | names.append(obj) | ||
elif hasattr(obj, '__name__'): | ||||
mapping[obj.__name__] = obj | ||||
MinRK
|
r3665 | else: | ||
MinRK
|
r9965 | raise TypeError("Objects other than modules and functions " | ||
"must be passed by kwarg, but got: %s" % type(obj) | ||||
) | ||||
MinRK
|
r3665 | |||
MinRK
|
r9965 | for name, obj in mapping.items(): | ||
mapping[name] = can(obj) | ||||
return depend(_require, *names, **mapping) | ||||
MinRK
|
r3548 | |||
class Dependency(set): | ||||
MinRK
|
r3611 | """An object for representing a set of msg_id dependencies. | ||
MinRK
|
r3548 | |||
MinRK
|
r3644 | Subclassed from set(). | ||
Parameters | ||||
---------- | ||||
dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict() | ||||
The msg_ids to depend on | ||||
all : bool [default True] | ||||
Whether the dependency should be considered met when *all* depending tasks have completed | ||||
or only when *any* have been completed. | ||||
MinRK
|
r3664 | success : bool [default True] | ||
Whether to consider successes as fulfilling dependencies. | ||||
failure : bool [default False] | ||||
Whether to consider failures as fulfilling dependencies. | ||||
If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency | ||||
MinRK
|
r3644 | as soon as the first depended-upon task fails. | ||
""" | ||||
MinRK
|
r3548 | |||
MinRK
|
r3624 | all=True | ||
MinRK
|
r3664 | success=True | ||
failure=True | ||||
MinRK
|
r3548 | |||
MinRK
|
r3664 | def __init__(self, dependencies=[], all=True, success=True, failure=False): | ||
MinRK
|
r3548 | if isinstance(dependencies, dict): | ||
# load from dict | ||||
MinRK
|
r3624 | all = dependencies.get('all', True) | ||
MinRK
|
r3664 | success = dependencies.get('success', success) | ||
failure = dependencies.get('failure', failure) | ||||
MinRK
|
r3607 | dependencies = dependencies.get('dependencies', []) | ||
MinRK
|
r3624 | ids = [] | ||
MinRK
|
r3664 | |||
# extract ids from various sources: | ||||
Thomas Kluyver
|
r13353 | if isinstance(dependencies, string_types + (AsyncResult,)): | ||
MinRK
|
r3664 | dependencies = [dependencies] | ||
for d in dependencies: | ||||
Thomas Kluyver
|
r13353 | if isinstance(d, string_types): | ||
MinRK
|
r3664 | ids.append(d) | ||
elif isinstance(d, AsyncResult): | ||||
ids.extend(d.msg_ids) | ||||
else: | ||||
raise TypeError("invalid dependency type: %r"%type(d)) | ||||
MinRK
|
r3624 | set.__init__(self, ids) | ||
self.all = all | ||||
MinRK
|
r3664 | if not (success or failure): | ||
raise ValueError("Must depend on at least one of successes or failures!") | ||||
self.success=success | ||||
self.failure = failure | ||||
MinRK
|
r3548 | |||
MinRK
|
r3607 | def check(self, completed, failed=None): | ||
MinRK
|
r3664 | """check whether our dependencies have been met.""" | ||
MinRK
|
r3548 | if len(self) == 0: | ||
return True | ||||
MinRK
|
r3664 | against = set() | ||
if self.success: | ||||
against = completed | ||||
if failed is not None and self.failure: | ||||
against = against.union(failed) | ||||
MinRK
|
r3624 | if self.all: | ||
MinRK
|
r3664 | return self.issubset(against) | ||
MinRK
|
r3565 | else: | ||
MinRK
|
r3664 | return not self.isdisjoint(against) | ||
MinRK
|
r3548 | |||
MinRK
|
r3664 | def unreachable(self, completed, failed=None): | ||
"""return whether this dependency has become impossible.""" | ||||
if len(self) == 0: | ||||
MinRK
|
r3607 | return False | ||
MinRK
|
r3664 | against = set() | ||
if not self.success: | ||||
against = completed | ||||
if failed is not None and not self.failure: | ||||
against = against.union(failed) | ||||
MinRK
|
r3624 | if self.all: | ||
MinRK
|
r3664 | return not self.isdisjoint(against) | ||
MinRK
|
r3607 | else: | ||
MinRK
|
r3664 | return self.issubset(against) | ||
MinRK
|
r3607 | |||
MinRK
|
r3548 | def as_dict(self): | ||
"""Represent this dependency as a dict. For json compatibility.""" | ||||
return dict( | ||||
dependencies=list(self), | ||||
MinRK
|
r3624 | all=self.all, | ||
MinRK
|
r3664 | success=self.success, | ||
failure=self.failure | ||||
MinRK
|
r3548 | ) | ||
MinRK
|
r3664 | |||
MinRK
|
r3548 | |||
MinRK
|
r3624 | __all__ = ['depend', 'require', 'dependent', 'Dependency'] | ||
MinRK
|
r3546 | |||