##// END OF EJS Templates
update Session object per review...
update Session object per review docstrings fleshed out, minor code cleanup.

File last commit:

r3673:b9f54806
r4017:74596588
Show More
dependency.py
196 lines | 6.1 KiB | text/x-python | PythonLexer
MinRK
added dependency decorator
r3546 """Dependency utilities"""
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
added dependency decorator
r3546
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 from types import ModuleType
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.client.asyncresult import AsyncResult
from IPython.parallel.error import UnmetDependency
from IPython.parallel.util import interactive
MinRK
added basic tunneling with ssh or paramiko
r3571
MinRK
added dependency decorator
r3546 class depend(object):
MinRK
cleanup pass
r3644 """Dependency decorator, for use with tasks.
`@depend` lets you define a function for engine dependencies
just like you use `apply` for tasks.
Examples
--------
::
@depend(df, a,b, c=5)
def f(m,n,p)
view.apply(f, 1,2,3)
will call df(a,b,c=5) on the engine, and if it returns False or
raises an UnmetDependency error, then the task will not be run
and another engine will be tried.
"""
MinRK
added dependency decorator
r3546 def __init__(self, f, *args, **kwargs):
self.f = f
self.args = args
self.kwargs = kwargs
def __call__(self, f):
return dependent(f, self.f, *self.args, **self.kwargs)
class dependent(object):
"""A function that depends on another function.
This is an object to prevent the closure used
in traditional decorators, which are not picklable.
"""
def __init__(self, f, df, *dargs, **dkwargs):
self.f = f
MinRK
Improvements to dependency handling...
r3607 self.func_name = getattr(f, '__name__', 'f')
MinRK
added dependency decorator
r3546 self.df = df
self.dargs = dargs
self.dkwargs = dkwargs
def __call__(self, *args, **kwargs):
MinRK
update API after sagedays29...
r3664 # if hasattr(self.f, 'func_globals') and hasattr(self.df, 'func_globals'):
# self.df.func_globals = self.f.func_globals
MinRK
added dependency decorator
r3546 if self.df(*self.dargs, **self.dkwargs) is False:
raise UnmetDependency()
return self.f(*args, **kwargs)
MinRK
Improvements to dependency handling...
r3607
@property
def __name__(self):
return self.func_name
MinRK
added dependency decorator
r3546
MinRK
update API after sagedays29...
r3664 @interactive
MinRK
added dependencies & Python scheduler
r3548 def _require(*names):
MinRK
cleanup pass
r3644 """Helper for @require decorator."""
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython.parallel.error import UnmetDependency
MinRK
update API after sagedays29...
r3664 user_ns = globals()
MinRK
added dependencies & Python scheduler
r3548 for name in names:
MinRK
update API after sagedays29...
r3664 if name in user_ns:
continue
MinRK
added dependencies & Python scheduler
r3548 try:
MinRK
update API after sagedays29...
r3664 exec 'import %s'%name in user_ns
MinRK
added dependencies & Python scheduler
r3548 except ImportError:
MinRK
update API after sagedays29...
r3664 raise UnmetDependency(name)
MinRK
added dependencies & Python scheduler
r3548 return True
MinRK
added dependency decorator
r3546
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def require(*mods):
MinRK
cleanup pass
r3644 """Simple decorator for requiring names to be importable.
Examples
--------
In [1]: @require('numpy')
...: def norm(a):
...: import numpy
...: return numpy.linalg.norm(a,2)
"""
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 names = []
for mod in mods:
if isinstance(mod, ModuleType):
mod = mod.__name__
if isinstance(mod, basestring):
names.append(mod)
else:
raise TypeError("names must be modules or module names, not %s"%type(mod))
MinRK
added dependencies & Python scheduler
r3548 return depend(_require, *names)
class Dependency(set):
MinRK
add timeout for unmet dependencies in task scheduler
r3611 """An object for representing a set of msg_id dependencies.
MinRK
added dependencies & Python scheduler
r3548
MinRK
cleanup pass
r3644 Subclassed from set().
Parameters
----------
dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
The msg_ids to depend on
all : bool [default True]
Whether the dependency should be considered met when *all* depending tasks have completed
or only when *any* have been completed.
MinRK
update API after sagedays29...
r3664 success : bool [default True]
Whether to consider successes as fulfilling dependencies.
failure : bool [default False]
Whether to consider failures as fulfilling dependencies.
If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency
MinRK
cleanup pass
r3644 as soon as the first depended-upon task fails.
"""
MinRK
added dependencies & Python scheduler
r3548
MinRK
dependency tweaks + dependency/scheduler docs
r3624 all=True
MinRK
update API after sagedays29...
r3664 success=True
failure=True
MinRK
added dependencies & Python scheduler
r3548
MinRK
update API after sagedays29...
r3664 def __init__(self, dependencies=[], all=True, success=True, failure=False):
MinRK
added dependencies & Python scheduler
r3548 if isinstance(dependencies, dict):
# load from dict
MinRK
dependency tweaks + dependency/scheduler docs
r3624 all = dependencies.get('all', True)
MinRK
update API after sagedays29...
r3664 success = dependencies.get('success', success)
failure = dependencies.get('failure', failure)
MinRK
Improvements to dependency handling...
r3607 dependencies = dependencies.get('dependencies', [])
MinRK
dependency tweaks + dependency/scheduler docs
r3624 ids = []
MinRK
update API after sagedays29...
r3664
# extract ids from various sources:
if isinstance(dependencies, (basestring, AsyncResult)):
dependencies = [dependencies]
for d in dependencies:
if isinstance(d, basestring):
ids.append(d)
elif isinstance(d, AsyncResult):
ids.extend(d.msg_ids)
else:
raise TypeError("invalid dependency type: %r"%type(d))
MinRK
dependency tweaks + dependency/scheduler docs
r3624 set.__init__(self, ids)
self.all = all
MinRK
update API after sagedays29...
r3664 if not (success or failure):
raise ValueError("Must depend on at least one of successes or failures!")
self.success=success
self.failure = failure
MinRK
added dependencies & Python scheduler
r3548
MinRK
Improvements to dependency handling...
r3607 def check(self, completed, failed=None):
MinRK
update API after sagedays29...
r3664 """check whether our dependencies have been met."""
MinRK
added dependencies & Python scheduler
r3548 if len(self) == 0:
return True
MinRK
update API after sagedays29...
r3664 against = set()
if self.success:
against = completed
if failed is not None and self.failure:
against = against.union(failed)
MinRK
dependency tweaks + dependency/scheduler docs
r3624 if self.all:
MinRK
update API after sagedays29...
r3664 return self.issubset(against)
MinRK
add all completed task IDs to Scheduler.all_done
r3565 else:
MinRK
update API after sagedays29...
r3664 return not self.isdisjoint(against)
MinRK
added dependencies & Python scheduler
r3548
MinRK
update API after sagedays29...
r3664 def unreachable(self, completed, failed=None):
"""return whether this dependency has become impossible."""
if len(self) == 0:
MinRK
Improvements to dependency handling...
r3607 return False
MinRK
update API after sagedays29...
r3664 against = set()
if not self.success:
against = completed
if failed is not None and not self.failure:
against = against.union(failed)
MinRK
dependency tweaks + dependency/scheduler docs
r3624 if self.all:
MinRK
update API after sagedays29...
r3664 return not self.isdisjoint(against)
MinRK
Improvements to dependency handling...
r3607 else:
MinRK
update API after sagedays29...
r3664 return self.issubset(against)
MinRK
Improvements to dependency handling...
r3607
MinRK
added dependencies & Python scheduler
r3548 def as_dict(self):
"""Represent this dependency as a dict. For json compatibility."""
return dict(
dependencies=list(self),
MinRK
dependency tweaks + dependency/scheduler docs
r3624 all=self.all,
MinRK
update API after sagedays29...
r3664 success=self.success,
failure=self.failure
MinRK
added dependencies & Python scheduler
r3548 )
MinRK
update API after sagedays29...
r3664
MinRK
added dependencies & Python scheduler
r3548
MinRK
dependency tweaks + dependency/scheduler docs
r3624 __all__ = ['depend', 'require', 'dependent', 'Dependency']
MinRK
added dependency decorator
r3546