|
|
"""Dependency utilities
|
|
|
|
|
|
Authors:
|
|
|
|
|
|
* Min RK
|
|
|
"""
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Copyright (C) 2013 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
from types import ModuleType
|
|
|
|
|
|
from IPython.parallel.client.asyncresult import AsyncResult
|
|
|
from IPython.parallel.error import UnmetDependency
|
|
|
from IPython.parallel.util import interactive
|
|
|
from IPython.utils import py3compat
|
|
|
from IPython.utils.py3compat import string_types
|
|
|
from IPython.utils.pickleutil import can, uncan
|
|
|
|
|
|
class depend(object):
|
|
|
"""Dependency decorator, for use with tasks.
|
|
|
|
|
|
`@depend` lets you define a function for engine dependencies
|
|
|
just like you use `apply` for tasks.
|
|
|
|
|
|
|
|
|
Examples
|
|
|
--------
|
|
|
::
|
|
|
|
|
|
@depend(df, a,b, c=5)
|
|
|
def f(m,n,p)
|
|
|
|
|
|
view.apply(f, 1,2,3)
|
|
|
|
|
|
will call df(a,b,c=5) on the engine, and if it returns False or
|
|
|
raises an UnmetDependency error, then the task will not be run
|
|
|
and another engine will be tried.
|
|
|
"""
|
|
|
def __init__(self, _wrapped_f, *args, **kwargs):
|
|
|
self.f = _wrapped_f
|
|
|
self.args = args
|
|
|
self.kwargs = kwargs
|
|
|
|
|
|
def __call__(self, f):
|
|
|
return dependent(f, self.f, *self.args, **self.kwargs)
|
|
|
|
|
|
class dependent(object):
|
|
|
"""A function that depends on another function.
|
|
|
This is an object to prevent the closure used
|
|
|
in traditional decorators, which are not picklable.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, _wrapped_f, _wrapped_df, *dargs, **dkwargs):
|
|
|
self.f = _wrapped_f
|
|
|
name = getattr(f, '__name__', 'f')
|
|
|
if py3compat.PY3:
|
|
|
self.__name__ = name
|
|
|
else:
|
|
|
self.func_name = name
|
|
|
self.df = _wrapped_df
|
|
|
self.dargs = dargs
|
|
|
self.dkwargs = dkwargs
|
|
|
|
|
|
def check_dependency(self):
|
|
|
if self.df(*self.dargs, **self.dkwargs) is False:
|
|
|
raise UnmetDependency()
|
|
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
|
return self.f(*args, **kwargs)
|
|
|
|
|
|
if not py3compat.PY3:
|
|
|
@property
|
|
|
def __name__(self):
|
|
|
return self.func_name
|
|
|
|
|
|
@interactive
|
|
|
def _require(*modules, **mapping):
|
|
|
"""Helper for @require decorator."""
|
|
|
from IPython.parallel.error import UnmetDependency
|
|
|
from IPython.utils.pickleutil import uncan
|
|
|
user_ns = globals()
|
|
|
for name in modules:
|
|
|
try:
|
|
|
exec('import %s' % name, user_ns)
|
|
|
except ImportError:
|
|
|
raise UnmetDependency(name)
|
|
|
|
|
|
for name, cobj in mapping.items():
|
|
|
user_ns[name] = uncan(cobj, user_ns)
|
|
|
return True
|
|
|
|
|
|
def require(*objects, **mapping):
|
|
|
"""Simple decorator for requiring local objects and modules to be available
|
|
|
when the decorated function is called on the engine.
|
|
|
|
|
|
Modules specified by name or passed directly will be imported
|
|
|
prior to calling the decorated function.
|
|
|
|
|
|
Objects other than modules will be pushed as a part of the task.
|
|
|
Functions can be passed positionally,
|
|
|
and will be pushed to the engine with their __name__.
|
|
|
Other objects can be passed by keyword arg.
|
|
|
|
|
|
Examples::
|
|
|
|
|
|
In [1]: @require('numpy')
|
|
|
...: def norm(a):
|
|
|
...: return numpy.linalg.norm(a,2)
|
|
|
|
|
|
In [2]: foo = lambda x: x*x
|
|
|
In [3]: @require(foo)
|
|
|
...: def bar(a):
|
|
|
...: return foo(1-a)
|
|
|
"""
|
|
|
names = []
|
|
|
for obj in objects:
|
|
|
if isinstance(obj, ModuleType):
|
|
|
obj = obj.__name__
|
|
|
|
|
|
if isinstance(obj, string_types):
|
|
|
names.append(obj)
|
|
|
elif hasattr(obj, '__name__'):
|
|
|
mapping[obj.__name__] = obj
|
|
|
else:
|
|
|
raise TypeError("Objects other than modules and functions "
|
|
|
"must be passed by kwarg, but got: %s" % type(obj)
|
|
|
)
|
|
|
|
|
|
for name, obj in mapping.items():
|
|
|
mapping[name] = can(obj)
|
|
|
return depend(_require, *names, **mapping)
|
|
|
|
|
|
class Dependency(set):
|
|
|
"""An object for representing a set of msg_id dependencies.
|
|
|
|
|
|
Subclassed from set().
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
|
|
|
The msg_ids to depend on
|
|
|
all : bool [default True]
|
|
|
Whether the dependency should be considered met when *all* depending tasks have completed
|
|
|
or only when *any* have been completed.
|
|
|
success : bool [default True]
|
|
|
Whether to consider successes as fulfilling dependencies.
|
|
|
failure : bool [default False]
|
|
|
Whether to consider failures as fulfilling dependencies.
|
|
|
|
|
|
If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency
|
|
|
as soon as the first depended-upon task fails.
|
|
|
"""
|
|
|
|
|
|
all=True
|
|
|
success=True
|
|
|
failure=True
|
|
|
|
|
|
def __init__(self, dependencies=[], all=True, success=True, failure=False):
|
|
|
if isinstance(dependencies, dict):
|
|
|
# load from dict
|
|
|
all = dependencies.get('all', True)
|
|
|
success = dependencies.get('success', success)
|
|
|
failure = dependencies.get('failure', failure)
|
|
|
dependencies = dependencies.get('dependencies', [])
|
|
|
ids = []
|
|
|
|
|
|
# extract ids from various sources:
|
|
|
if isinstance(dependencies, string_types + (AsyncResult,)):
|
|
|
dependencies = [dependencies]
|
|
|
for d in dependencies:
|
|
|
if isinstance(d, string_types):
|
|
|
ids.append(d)
|
|
|
elif isinstance(d, AsyncResult):
|
|
|
ids.extend(d.msg_ids)
|
|
|
else:
|
|
|
raise TypeError("invalid dependency type: %r"%type(d))
|
|
|
|
|
|
set.__init__(self, ids)
|
|
|
self.all = all
|
|
|
if not (success or failure):
|
|
|
raise ValueError("Must depend on at least one of successes or failures!")
|
|
|
self.success=success
|
|
|
self.failure = failure
|
|
|
|
|
|
def check(self, completed, failed=None):
|
|
|
"""check whether our dependencies have been met."""
|
|
|
if len(self) == 0:
|
|
|
return True
|
|
|
against = set()
|
|
|
if self.success:
|
|
|
against = completed
|
|
|
if failed is not None and self.failure:
|
|
|
against = against.union(failed)
|
|
|
if self.all:
|
|
|
return self.issubset(against)
|
|
|
else:
|
|
|
return not self.isdisjoint(against)
|
|
|
|
|
|
def unreachable(self, completed, failed=None):
|
|
|
"""return whether this dependency has become impossible."""
|
|
|
if len(self) == 0:
|
|
|
return False
|
|
|
against = set()
|
|
|
if not self.success:
|
|
|
against = completed
|
|
|
if failed is not None and not self.failure:
|
|
|
against = against.union(failed)
|
|
|
if self.all:
|
|
|
return not self.isdisjoint(against)
|
|
|
else:
|
|
|
return self.issubset(against)
|
|
|
|
|
|
|
|
|
def as_dict(self):
|
|
|
"""Represent this dependency as a dict. For json compatibility."""
|
|
|
return dict(
|
|
|
dependencies=list(self),
|
|
|
all=self.all,
|
|
|
success=self.success,
|
|
|
failure=self.failure
|
|
|
)
|
|
|
|
|
|
|
|
|
__all__ = ['depend', 'require', 'dependent', 'Dependency']
|
|
|
|
|
|
|