"""Dependency utilities""" from IPython.external.decorator import decorator from .asyncresult import AsyncResult from .error import UnmetDependency class depend(object): """Dependency decorator, for use with tasks. `@depend` lets you define a function for engine dependencies just like you use `apply` for tasks. Examples -------- :: @depend(df, a,b, c=5) def f(m,n,p) view.apply(f, 1,2,3) will call df(a,b,c=5) on the engine, and if it returns False or raises an UnmetDependency error, then the task will not be run and another engine will be tried. """ def __init__(self, f, *args, **kwargs): self.f = f self.args = args self.kwargs = kwargs def __call__(self, f): return dependent(f, self.f, *self.args, **self.kwargs) class dependent(object): """A function that depends on another function. This is an object to prevent the closure used in traditional decorators, which are not picklable. """ def __init__(self, f, df, *dargs, **dkwargs): self.f = f self.func_name = getattr(f, '__name__', 'f') self.df = df self.dargs = dargs self.dkwargs = dkwargs def __call__(self, *args, **kwargs): if self.df(*self.dargs, **self.dkwargs) is False: raise UnmetDependency() return self.f(*args, **kwargs) @property def __name__(self): return self.func_name def _require(*names): """Helper for @require decorator.""" for name in names: try: __import__(name) except ImportError: return False return True def require(*names): """Simple decorator for requiring names to be importable. Examples -------- In [1]: @require('numpy') ...: def norm(a): ...: import numpy ...: return numpy.linalg.norm(a,2) """ return depend(_require, *names) class Dependency(set): """An object for representing a set of msg_id dependencies. Subclassed from set(). Parameters ---------- dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict() The msg_ids to depend on all : bool [default True] Whether the dependency should be considered met when *all* depending tasks have completed or only when *any* have been completed. success_only : bool [default True] Whether to consider only successes for Dependencies, or consider failures as well. If `all=success_only=True`, then this task will fail with an ImpossibleDependency as soon as the first depended-upon task fails. """ all=True success_only=True def __init__(self, dependencies=[], all=True, success_only=True): if isinstance(dependencies, dict): # load from dict all = dependencies.get('all', True) success_only = dependencies.get('success_only', success_only) dependencies = dependencies.get('dependencies', []) ids = [] if isinstance(dependencies, AsyncResult): ids.extend(AsyncResult.msg_ids) else: for d in dependencies: if isinstance(d, basestring): ids.append(d) elif isinstance(d, AsyncResult): ids.extend(d.msg_ids) else: raise TypeError("invalid dependency type: %r"%type(d)) set.__init__(self, ids) self.all = all self.success_only=success_only def check(self, completed, failed=None): if failed is not None and not self.success_only: completed = completed.union(failed) if len(self) == 0: return True if self.all: return self.issubset(completed) else: return not self.isdisjoint(completed) def unreachable(self, failed): if len(self) == 0 or len(failed) == 0 or not self.success_only: return False # print self, self.success_only, self.all, failed if self.all: return not self.isdisjoint(failed) else: return self.issubset(failed) def as_dict(self): """Represent this dependency as a dict. For json compatibility.""" return dict( dependencies=list(self), all=self.all, success_only=self.success_only, ) __all__ = ['depend', 'require', 'dependent', 'Dependency']