dependency.py
201 lines
| 6.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """Dependency utilities | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3660 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2010-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3546 | |||
MinRK
|
r3665 | from types import ModuleType | ||
MinRK
|
r3673 | from IPython.parallel.client.asyncresult import AsyncResult | ||
from IPython.parallel.error import UnmetDependency | ||||
from IPython.parallel.util import interactive | ||||
MinRK
|
r3571 | |||
MinRK
|
r3546 | class depend(object): | ||
MinRK
|
r3644 | """Dependency decorator, for use with tasks. | ||
`@depend` lets you define a function for engine dependencies | ||||
just like you use `apply` for tasks. | ||||
Examples | ||||
-------- | ||||
:: | ||||
@depend(df, a,b, c=5) | ||||
def f(m,n,p) | ||||
view.apply(f, 1,2,3) | ||||
will call df(a,b,c=5) on the engine, and if it returns False or | ||||
raises an UnmetDependency error, then the task will not be run | ||||
and another engine will be tried. | ||||
""" | ||||
MinRK
|
r3546 | def __init__(self, f, *args, **kwargs): | ||
self.f = f | ||||
self.args = args | ||||
self.kwargs = kwargs | ||||
def __call__(self, f): | ||||
return dependent(f, self.f, *self.args, **self.kwargs) | ||||
class dependent(object): | ||||
"""A function that depends on another function. | ||||
This is an object to prevent the closure used | ||||
in traditional decorators, which are not picklable. | ||||
""" | ||||
def __init__(self, f, df, *dargs, **dkwargs): | ||||
self.f = f | ||||
MinRK
|
r3607 | self.func_name = getattr(f, '__name__', 'f') | ||
MinRK
|
r3546 | self.df = df | ||
self.dargs = dargs | ||||
self.dkwargs = dkwargs | ||||
def __call__(self, *args, **kwargs): | ||||
MinRK
|
r3664 | # if hasattr(self.f, 'func_globals') and hasattr(self.df, 'func_globals'): | ||
# self.df.func_globals = self.f.func_globals | ||||
MinRK
|
r3546 | if self.df(*self.dargs, **self.dkwargs) is False: | ||
raise UnmetDependency() | ||||
return self.f(*args, **kwargs) | ||||
MinRK
|
r3607 | |||
@property | ||||
def __name__(self): | ||||
return self.func_name | ||||
MinRK
|
r3546 | |||
MinRK
|
r3664 | @interactive | ||
MinRK
|
r3548 | def _require(*names): | ||
MinRK
|
r3644 | """Helper for @require decorator.""" | ||
MinRK
|
r3666 | from IPython.parallel.error import UnmetDependency | ||
MinRK
|
r3664 | user_ns = globals() | ||
MinRK
|
r3548 | for name in names: | ||
MinRK
|
r3664 | if name in user_ns: | ||
continue | ||||
MinRK
|
r3548 | try: | ||
MinRK
|
r3664 | exec 'import %s'%name in user_ns | ||
MinRK
|
r3548 | except ImportError: | ||
MinRK
|
r3664 | raise UnmetDependency(name) | ||
MinRK
|
r3548 | return True | ||
MinRK
|
r3546 | |||
MinRK
|
r3665 | def require(*mods): | ||
MinRK
|
r3644 | """Simple decorator for requiring names to be importable. | ||
Examples | ||||
-------- | ||||
In [1]: @require('numpy') | ||||
...: def norm(a): | ||||
...: import numpy | ||||
...: return numpy.linalg.norm(a,2) | ||||
""" | ||||
MinRK
|
r3665 | names = [] | ||
for mod in mods: | ||||
if isinstance(mod, ModuleType): | ||||
mod = mod.__name__ | ||||
if isinstance(mod, basestring): | ||||
names.append(mod) | ||||
else: | ||||
raise TypeError("names must be modules or module names, not %s"%type(mod)) | ||||
MinRK
|
r3548 | return depend(_require, *names) | ||
class Dependency(set): | ||||
MinRK
|
r3611 | """An object for representing a set of msg_id dependencies. | ||
MinRK
|
r3548 | |||
MinRK
|
r3644 | Subclassed from set(). | ||
Parameters | ||||
---------- | ||||
dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict() | ||||
The msg_ids to depend on | ||||
all : bool [default True] | ||||
Whether the dependency should be considered met when *all* depending tasks have completed | ||||
or only when *any* have been completed. | ||||
MinRK
|
r3664 | success : bool [default True] | ||
Whether to consider successes as fulfilling dependencies. | ||||
failure : bool [default False] | ||||
Whether to consider failures as fulfilling dependencies. | ||||
If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency | ||||
MinRK
|
r3644 | as soon as the first depended-upon task fails. | ||
""" | ||||
MinRK
|
r3548 | |||
MinRK
|
r3624 | all=True | ||
MinRK
|
r3664 | success=True | ||
failure=True | ||||
MinRK
|
r3548 | |||
MinRK
|
r3664 | def __init__(self, dependencies=[], all=True, success=True, failure=False): | ||
MinRK
|
r3548 | if isinstance(dependencies, dict): | ||
# load from dict | ||||
MinRK
|
r3624 | all = dependencies.get('all', True) | ||
MinRK
|
r3664 | success = dependencies.get('success', success) | ||
failure = dependencies.get('failure', failure) | ||||
MinRK
|
r3607 | dependencies = dependencies.get('dependencies', []) | ||
MinRK
|
r3624 | ids = [] | ||
MinRK
|
r3664 | |||
# extract ids from various sources: | ||||
if isinstance(dependencies, (basestring, AsyncResult)): | ||||
dependencies = [dependencies] | ||||
for d in dependencies: | ||||
if isinstance(d, basestring): | ||||
ids.append(d) | ||||
elif isinstance(d, AsyncResult): | ||||
ids.extend(d.msg_ids) | ||||
else: | ||||
raise TypeError("invalid dependency type: %r"%type(d)) | ||||
MinRK
|
r3624 | set.__init__(self, ids) | ||
self.all = all | ||||
MinRK
|
r3664 | if not (success or failure): | ||
raise ValueError("Must depend on at least one of successes or failures!") | ||||
self.success=success | ||||
self.failure = failure | ||||
MinRK
|
r3548 | |||
MinRK
|
r3607 | def check(self, completed, failed=None): | ||
MinRK
|
r3664 | """check whether our dependencies have been met.""" | ||
MinRK
|
r3548 | if len(self) == 0: | ||
return True | ||||
MinRK
|
r3664 | against = set() | ||
if self.success: | ||||
against = completed | ||||
if failed is not None and self.failure: | ||||
against = against.union(failed) | ||||
MinRK
|
r3624 | if self.all: | ||
MinRK
|
r3664 | return self.issubset(against) | ||
MinRK
|
r3565 | else: | ||
MinRK
|
r3664 | return not self.isdisjoint(against) | ||
MinRK
|
r3548 | |||
MinRK
|
r3664 | def unreachable(self, completed, failed=None): | ||
"""return whether this dependency has become impossible.""" | ||||
if len(self) == 0: | ||||
MinRK
|
r3607 | return False | ||
MinRK
|
r3664 | against = set() | ||
if not self.success: | ||||
against = completed | ||||
if failed is not None and not self.failure: | ||||
against = against.union(failed) | ||||
MinRK
|
r3624 | if self.all: | ||
MinRK
|
r3664 | return not self.isdisjoint(against) | ||
MinRK
|
r3607 | else: | ||
MinRK
|
r3664 | return self.issubset(against) | ||
MinRK
|
r3607 | |||
MinRK
|
r3548 | def as_dict(self): | ||
"""Represent this dependency as a dict. For json compatibility.""" | ||||
return dict( | ||||
dependencies=list(self), | ||||
MinRK
|
r3624 | all=self.all, | ||
MinRK
|
r3664 | success=self.success, | ||
failure=self.failure | ||||
MinRK
|
r3548 | ) | ||
MinRK
|
r3664 | |||
MinRK
|
r3548 | |||
MinRK
|
r3624 | __all__ = ['depend', 'require', 'dependent', 'Dependency'] | ||
MinRK
|
r3546 | |||