##// END OF EJS Templates
Adding test for rollback
Adding test for rollback

File last commit:

r21070:1fdd9cb1
r21181:3850233f
Show More
dependency.py
229 lines | 7.2 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """Dependency utilities
Authors:
* Min RK
"""
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
MinRK
support non-modules in @require...
r9965 # Copyright (C) 2013 The IPython Development Team
MinRK
copyright statements
r3660 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
added dependency decorator
r3546
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 from types import ModuleType
Min RK
s/IPython.parallel/ipython_parallel/
r20860 from ipython_parallel.client.asyncresult import AsyncResult
from ipython_parallel.error import UnmetDependency
from ipython_parallel.util import interactive
Thomas Kluyver
More Python 3 compatibility fixes.
r4735 from IPython.utils import py3compat
Thomas Kluyver
Replace references to unicode and basestring
r13353 from IPython.utils.py3compat import string_types
Min RK
update pickleutil imports
r21070 from ipython_kernel.pickleutil import can, uncan
MinRK
added basic tunneling with ssh or paramiko
r3571
MinRK
added dependency decorator
r3546 class depend(object):
MinRK
cleanup pass
r3644 """Dependency decorator, for use with tasks.
`@depend` lets you define a function for engine dependencies
just like you use `apply` for tasks.
Examples
--------
::
@depend(df, a,b, c=5)
def f(m,n,p)
view.apply(f, 1,2,3)
will call df(a,b,c=5) on the engine, and if it returns False or
raises an UnmetDependency error, then the task will not be run
and another engine will be tried.
"""
MinRK
don't use common names in require decorators...
r16420 def __init__(self, _wrapped_f, *args, **kwargs):
self.f = _wrapped_f
MinRK
added dependency decorator
r3546 self.args = args
self.kwargs = kwargs
def __call__(self, f):
return dependent(f, self.f, *self.args, **self.kwargs)
class dependent(object):
"""A function that depends on another function.
This is an object to prevent the closure used
in traditional decorators, which are not picklable.
"""
MinRK
don't use common names in require decorators...
r16420 def __init__(self, _wrapped_f, _wrapped_df, *dargs, **dkwargs):
self.f = _wrapped_f
Thomas Kluyver
Fix a NameError in IPython.parallel
r16442 name = getattr(_wrapped_f, '__name__', 'f')
Thomas Kluyver
Fix parallel test suite
r13383 if py3compat.PY3:
self.__name__ = name
else:
self.func_name = name
MinRK
don't use common names in require decorators...
r16420 self.df = _wrapped_df
MinRK
added dependency decorator
r3546 self.dargs = dargs
self.dkwargs = dkwargs
MinRK
use canning hook in dependent...
r9986
def check_dependency(self):
MinRK
added dependency decorator
r3546 if self.df(*self.dargs, **self.dkwargs) is False:
raise UnmetDependency()
MinRK
use canning hook in dependent...
r9986
def __call__(self, *args, **kwargs):
MinRK
added dependency decorator
r3546 return self.f(*args, **kwargs)
MinRK
Improvements to dependency handling...
r3607
Thomas Kluyver
More Python 3 compatibility fixes.
r4735 if not py3compat.PY3:
@property
def __name__(self):
MinRK
fix Python2 infinite recursion on __name__
r13400 return self.func_name
MinRK
added dependency decorator
r3546
MinRK
update API after sagedays29...
r3664 @interactive
MinRK
support non-modules in @require...
r9965 def _require(*modules, **mapping):
MinRK
cleanup pass
r3644 """Helper for @require decorator."""
Min RK
s/IPython.parallel/ipython_parallel/
r20860 from ipython_parallel.error import UnmetDependency
Min RK
update pickleutil imports
r21070 from ipython_kernel.pickleutil import uncan
MinRK
update API after sagedays29...
r3664 user_ns = globals()
MinRK
support non-modules in @require...
r9965 for name in modules:
MinRK
added dependencies & Python scheduler
r3548 try:
Thomas Kluyver
Fix exec statements for Py 3...
r13350 exec('import %s' % name, user_ns)
MinRK
added dependencies & Python scheduler
r3548 except ImportError:
MinRK
update API after sagedays29...
r3664 raise UnmetDependency(name)
MinRK
support non-modules in @require...
r9965
for name, cobj in mapping.items():
user_ns[name] = uncan(cobj, user_ns)
MinRK
added dependencies & Python scheduler
r3548 return True
MinRK
added dependency decorator
r3546
MinRK
support non-modules in @require...
r9965 def require(*objects, **mapping):
"""Simple decorator for requiring local objects and modules to be available
when the decorated function is called on the engine.
Modules specified by name or passed directly will be imported
prior to calling the decorated function.
Objects other than modules will be pushed as a part of the task.
Functions can be passed positionally,
and will be pushed to the engine with their __name__.
Other objects can be passed by keyword arg.
MinRK
cleanup pass
r3644
Thomas Kluyver
Various docs fixes
r13595 Examples::
MinRK
cleanup pass
r3644
Thomas Kluyver
Various docs fixes
r13595 In [1]: @require('numpy')
...: def norm(a):
...: return numpy.linalg.norm(a,2)
MinRK
support non-modules in @require...
r9965
Thomas Kluyver
Various docs fixes
r13595 In [2]: foo = lambda x: x*x
In [3]: @require(foo)
...: def bar(a):
...: return foo(1-a)
MinRK
cleanup pass
r3644 """
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 names = []
MinRK
support non-modules in @require...
r9965 for obj in objects:
if isinstance(obj, ModuleType):
obj = obj.__name__
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665
Thomas Kluyver
Replace references to unicode and basestring
r13353 if isinstance(obj, string_types):
MinRK
support non-modules in @require...
r9965 names.append(obj)
elif hasattr(obj, '__name__'):
mapping[obj.__name__] = obj
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 else:
MinRK
support non-modules in @require...
r9965 raise TypeError("Objects other than modules and functions "
"must be passed by kwarg, but got: %s" % type(obj)
)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665
MinRK
support non-modules in @require...
r9965 for name, obj in mapping.items():
mapping[name] = can(obj)
return depend(_require, *names, **mapping)
MinRK
added dependencies & Python scheduler
r3548
class Dependency(set):
MinRK
add timeout for unmet dependencies in task scheduler
r3611 """An object for representing a set of msg_id dependencies.
MinRK
added dependencies & Python scheduler
r3548
MinRK
cleanup pass
r3644 Subclassed from set().
Parameters
----------
dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
The msg_ids to depend on
all : bool [default True]
Whether the dependency should be considered met when *all* depending tasks have completed
or only when *any* have been completed.
MinRK
update API after sagedays29...
r3664 success : bool [default True]
Whether to consider successes as fulfilling dependencies.
failure : bool [default False]
Whether to consider failures as fulfilling dependencies.
If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency
MinRK
cleanup pass
r3644 as soon as the first depended-upon task fails.
"""
MinRK
added dependencies & Python scheduler
r3548
MinRK
dependency tweaks + dependency/scheduler docs
r3624 all=True
MinRK
update API after sagedays29...
r3664 success=True
failure=True
MinRK
added dependencies & Python scheduler
r3548
MinRK
update API after sagedays29...
r3664 def __init__(self, dependencies=[], all=True, success=True, failure=False):
MinRK
added dependencies & Python scheduler
r3548 if isinstance(dependencies, dict):
# load from dict
MinRK
dependency tweaks + dependency/scheduler docs
r3624 all = dependencies.get('all', True)
MinRK
update API after sagedays29...
r3664 success = dependencies.get('success', success)
failure = dependencies.get('failure', failure)
MinRK
Improvements to dependency handling...
r3607 dependencies = dependencies.get('dependencies', [])
MinRK
dependency tweaks + dependency/scheduler docs
r3624 ids = []
MinRK
update API after sagedays29...
r3664
# extract ids from various sources:
Thomas Kluyver
Replace references to unicode and basestring
r13353 if isinstance(dependencies, string_types + (AsyncResult,)):
MinRK
update API after sagedays29...
r3664 dependencies = [dependencies]
for d in dependencies:
Thomas Kluyver
Replace references to unicode and basestring
r13353 if isinstance(d, string_types):
MinRK
update API after sagedays29...
r3664 ids.append(d)
elif isinstance(d, AsyncResult):
ids.extend(d.msg_ids)
else:
raise TypeError("invalid dependency type: %r"%type(d))
MinRK
dependency tweaks + dependency/scheduler docs
r3624 set.__init__(self, ids)
self.all = all
MinRK
update API after sagedays29...
r3664 if not (success or failure):
raise ValueError("Must depend on at least one of successes or failures!")
self.success=success
self.failure = failure
MinRK
added dependencies & Python scheduler
r3548
MinRK
Improvements to dependency handling...
r3607 def check(self, completed, failed=None):
MinRK
update API after sagedays29...
r3664 """check whether our dependencies have been met."""
MinRK
added dependencies & Python scheduler
r3548 if len(self) == 0:
return True
MinRK
update API after sagedays29...
r3664 against = set()
if self.success:
against = completed
if failed is not None and self.failure:
against = against.union(failed)
MinRK
dependency tweaks + dependency/scheduler docs
r3624 if self.all:
MinRK
update API after sagedays29...
r3664 return self.issubset(against)
MinRK
add all completed task IDs to Scheduler.all_done
r3565 else:
MinRK
update API after sagedays29...
r3664 return not self.isdisjoint(against)
MinRK
added dependencies & Python scheduler
r3548
MinRK
update API after sagedays29...
r3664 def unreachable(self, completed, failed=None):
"""return whether this dependency has become impossible."""
if len(self) == 0:
MinRK
Improvements to dependency handling...
r3607 return False
MinRK
update API after sagedays29...
r3664 against = set()
if not self.success:
against = completed
if failed is not None and not self.failure:
against = against.union(failed)
MinRK
dependency tweaks + dependency/scheduler docs
r3624 if self.all:
MinRK
update API after sagedays29...
r3664 return not self.isdisjoint(against)
MinRK
Improvements to dependency handling...
r3607 else:
MinRK
update API after sagedays29...
r3664 return self.issubset(against)
MinRK
Improvements to dependency handling...
r3607
MinRK
added dependencies & Python scheduler
r3548 def as_dict(self):
"""Represent this dependency as a dict. For json compatibility."""
return dict(
dependencies=list(self),
MinRK
dependency tweaks + dependency/scheduler docs
r3624 all=self.all,
MinRK
update API after sagedays29...
r3664 success=self.success,
failure=self.failure
MinRK
added dependencies & Python scheduler
r3548 )
MinRK
update API after sagedays29...
r3664
MinRK
added dependencies & Python scheduler
r3548
MinRK
dependency tweaks + dependency/scheduler docs
r3624 __all__ = ['depend', 'require', 'dependent', 'Dependency']
MinRK
added dependency decorator
r3546