view.py
669 lines
| 23.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3644 | """Views of remote engines.""" | |
MinRK
|
r3584 | #----------------------------------------------------------------------------- | |
# Copyright (C) 2010 The IPython Development Team | |||
# | |||
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#----------------------------------------------------------------------------- | |||
#----------------------------------------------------------------------------- | |||
# Imports | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3539 | ||
MinRK
|
r3641 | from IPython.testing import decorators as testdec | |
MinRK
|
r3644 | from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance | |
MinRK
|
r3635 | ||
MinRK
|
r3539 | from IPython.external.decorator import decorator | |
MinRK
|
r3642 | ||
from .asyncresult import AsyncResult | |||
from .dependency import Dependency | |||
from .remotefunction import ParallelFunction, parallel, remote | |||
MinRK
|
r3539 | ||
MinRK
|
r3584 | #----------------------------------------------------------------------------- | |
# Decorators | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3539 | ||
@decorator | |||
def myblock(f, self, *args, **kwargs): | |||
MinRK
|
r3584 | """override client.block with self.block during a call""" | |
MinRK
|
r3539 | block = self.client.block | |
self.client.block = self.block | |||
MinRK
|
r3590 | try: | |
ret = f(self, *args, **kwargs) | |||
finally: | |||
self.client.block = block | |||
MinRK
|
r3539 | return ret | |
MinRK
|
r3543 | @decorator | |
def save_ids(f, self, *args, **kwargs): | |||
MinRK
|
r3584 | """Keep our history and outstanding attributes up to date after a method call.""" | |
MinRK
|
r3588 | n_previous = len(self.client.history) | |
MinRK
|
r3543 | ret = f(self, *args, **kwargs) | |
MinRK
|
r3588 | nmsgs = len(self.client.history) - n_previous | |
msg_ids = self.client.history[-nmsgs:] | |||
MinRK
|
r3543 | self.history.extend(msg_ids) | |
map(self.outstanding.add, msg_ids) | |||
return ret | |||
@decorator | |||
def sync_results(f, self, *args, **kwargs): | |||
MinRK
|
r3584 | """sync relevant results from self.client to our results attribute.""" | |
MinRK
|
r3543 | ret = f(self, *args, **kwargs) | |
delta = self.outstanding.difference(self.client.outstanding) | |||
completed = self.outstanding.intersection(delta) | |||
self.outstanding = self.outstanding.difference(completed) | |||
for msg_id in completed: | |||
self.results[msg_id] = self.client.results[msg_id] | |||
return ret | |||
@decorator | |||
def spin_after(f, self, *args, **kwargs): | |||
MinRK
|
r3584 | """call spin after the method.""" | |
MinRK
|
r3543 | ret = f(self, *args, **kwargs) | |
self.spin() | |||
return ret | |||
MinRK
|
r3584 | #----------------------------------------------------------------------------- | |
# Classes | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3543 | ||
MinRK
|
r3635 | class View(HasTraits): | |
MinRK
|
r3584 | """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. | |
Don't use this class, use subclasses. | |||
""" | |||
MinRK
|
r3635 | block=Bool(False) | |
bound=Bool(False) | |||
MinRK
|
r3655 | track=Bool(False) | |
MinRK
|
r3635 | history=List() | |
outstanding = Set() | |||
results = Dict() | |||
client = Instance('IPython.zmq.parallel.client.Client') | |||
_ntargets = Int(1) | |||
_balanced = Bool(False) | |||
MinRK
|
r3655 | _default_names = List(['block', 'bound', 'track']) | |
MinRK
|
r3644 | _targets = Any() | |
MinRK
|
r3539 | ||
MinRK
|
r3635 | def __init__(self, client=None, targets=None): | |
super(View, self).__init__(client=client) | |||
MinRK
|
r3539 | self._targets = targets | |
MinRK
|
r3559 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) | |
MinRK
|
r3539 | self.block = client.block | |
MinRK
|
r3635 | ||
MinRK
|
r3625 | for name in self._default_names: | |
setattr(self, name, getattr(self, name, None)) | |||
MinRK
|
r3639 | assert not self.__class__ is View, "Don't use base View objects, use subclasses" | |
MinRK
|
r3543 | ||
MinRK
|
r3539 | def __repr__(self): | |
strtargets = str(self._targets) | |||
if len(strtargets) > 16: | |||
strtargets = strtargets[:12]+'...]' | |||
return "<%s %s>"%(self.__class__.__name__, strtargets) | |||
MinRK
|
r3543 | ||
MinRK
|
r3539 | @property | |
def targets(self): | |||
return self._targets | |||
MinRK
|
r3543 | ||
MinRK
|
r3539 | @targets.setter | |
def targets(self, value): | |||
MinRK
|
r3635 | raise AttributeError("Cannot set View `targets` after construction!") | |
MinRK
|
r3625 | ||
MinRK
|
r3639 | @property | |
def balanced(self): | |||
return self._balanced | |||
@balanced.setter | |||
def balanced(self, value): | |||
raise AttributeError("Cannot set View `balanced` after construction!") | |||
MinRK
|
r3625 | def _defaults(self, *excludes): | |
"""return dict of our default attributes, excluding names given.""" | |||
MinRK
|
r3639 | d = dict(balanced=self._balanced, targets=self._targets) | |
MinRK
|
r3625 | for name in self._default_names: | |
if name not in excludes: | |||
d[name] = getattr(self, name) | |||
return d | |||
MinRK
|
r3635 | def set_flags(self, **kwargs): | |
"""set my attribute flags by keyword. | |||
A View is a wrapper for the Client's apply method, but | |||
with attributes that specify keyword arguments, those attributes | |||
can be set by keyword argument with this method. | |||
Parameters | |||
---------- | |||
block : bool | |||
whether to wait for results | |||
bound : bool | |||
MinRK
|
r3655 | whether to pass the client's Namespace as the first argument | |
to functions called via `apply`. | |||
track : bool | |||
whether to create a MessageTracker to allow the user to | |||
safely edit after arrays and buffers during non-copying | |||
sends. | |||
MinRK
|
r3635 | """ | |
for key in kwargs: | |||
if key not in self._default_names: | |||
raise KeyError("Invalid name: %r"%key) | |||
for name in ('block', 'bound'): | |||
if name in kwargs: | |||
MinRK
|
r3636 | setattr(self, name, kwargs[name]) | |
MinRK
|
r3635 | ||
#---------------------------------------------------------------- | |||
# wrappers for client methods: | |||
#---------------------------------------------------------------- | |||
MinRK
|
r3543 | @sync_results | |
def spin(self): | |||
"""spin the client, and sync""" | |||
self.client.spin() | |||
MinRK
|
r3625 | ||
MinRK
|
r3543 | @sync_results | |
@save_ids | |||
MinRK
|
r3539 | def apply(self, f, *args, **kwargs): | |
"""calls f(*args, **kwargs) on remote engines, returning the result. | |||
MinRK
|
r3655 | This method sets all of `client.apply`'s keyword arguments via this | |
View's attributes. | |||
MinRK
|
r3539 | ||
if self.block is False: | |||
MinRK
|
r3655 | returns AsyncResult | |
MinRK
|
r3539 | else: | |
returns actual result of f(*args, **kwargs) | |||
""" | |||
MinRK
|
r3635 | return self.client.apply(f, args, kwargs, **self._defaults()) | |
MinRK
|
r3543 | ||
@save_ids | |||
MinRK
|
r3539 | def apply_async(self, f, *args, **kwargs): | |
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner. | |||
MinRK
|
r3655 | returns AsyncResult | |
MinRK
|
r3539 | """ | |
MinRK
|
r3625 | d = self._defaults('block', 'bound') | |
MinRK
|
r3635 | return self.client.apply(f,args,kwargs, block=False, bound=False, **d) | |
MinRK
|
r3543 | ||
@spin_after | |||
@save_ids | |||
MinRK
|
r3539 | def apply_sync(self, f, *args, **kwargs): | |
"""calls f(*args, **kwargs) on remote engines in a blocking manner, | |||
returning the result. | |||
returns: actual result of f(*args, **kwargs) | |||
""" | |||
MinRK
|
r3655 | d = self._defaults('block', 'bound', 'track') | |
MinRK
|
r3635 | return self.client.apply(f,args,kwargs, block=True, bound=False, **d) | |
MinRK
|
r3543 | ||
MinRK
|
r3639 | # @sync_results | |
# @save_ids | |||
# def apply_bound(self, f, *args, **kwargs): | |||
# """calls f(*args, **kwargs) bound to engine namespace(s). | |||
# | |||
# if self.block is False: | |||
# returns msg_id | |||
# else: | |||
# returns actual result of f(*args, **kwargs) | |||
# | |||
# This method has access to the targets' namespace via globals() | |||
# | |||
# """ | |||
# d = self._defaults('bound') | |||
# return self.client.apply(f, args, kwargs, bound=True, **d) | |||
# | |||
MinRK
|
r3543 | @sync_results | |
@save_ids | |||
MinRK
|
r3539 | def apply_async_bound(self, f, *args, **kwargs): | |
"""calls f(*args, **kwargs) bound to engine namespace(s) | |||
in a nonblocking manner. | |||
MinRK
|
r3655 | The first argument to `f` will be the Engine's Namespace | |
MinRK
|
r3539 | ||
MinRK
|
r3655 | returns: AsyncResult | |
MinRK
|
r3539 | ||
""" | |||
MinRK
|
r3625 | d = self._defaults('block', 'bound') | |
MinRK
|
r3635 | return self.client.apply(f, args, kwargs, block=False, bound=True, **d) | |
MinRK
|
r3543 | ||
MinRK
|
r3544 | @spin_after | |
MinRK
|
r3543 | @save_ids | |
MinRK
|
r3539 | def apply_sync_bound(self, f, *args, **kwargs): | |
"""calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. | |||
MinRK
|
r3655 | The first argument to `f` will be the Engine's Namespace | |
MinRK
|
r3539 | ||
MinRK
|
r3655 | returns: actual result of f(*args, **kwargs) | |
MinRK
|
r3539 | ||
""" | |||
MinRK
|
r3625 | d = self._defaults('block', 'bound') | |
MinRK
|
r3635 | return self.client.apply(f, args, kwargs, block=True, bound=True, **d) | |
MinRK
|
r3590 | ||
MinRK
|
r3639 | def abort(self, jobs=None, block=None): | |
MinRK
|
r3559 | """Abort jobs on my engines. | |
Parameters | |||
---------- | |||
MinRK
|
r3639 | jobs : None, str, list of strs, optional | |
MinRK
|
r3559 | if None: abort all jobs. | |
else: abort specific msg_id(s). | |||
""" | |||
block = block if block is not None else self.block | |||
MinRK
|
r3639 | return self.client.abort(jobs=jobs, targets=self._targets, block=block) | |
MinRK
|
r3559 | ||
def queue_status(self, verbose=False): | |||
"""Fetch the Queue status of my engines""" | |||
MinRK
|
r3639 | return self.client.queue_status(targets=self._targets, verbose=verbose) | |
MinRK
|
r3559 | ||
MinRK
|
r3639 | def purge_results(self, jobs=[], targets=[]): | |
MinRK
|
r3559 | """Instruct the controller to forget specific results.""" | |
if targets is None or targets == 'all': | |||
MinRK
|
r3639 | targets = self._targets | |
return self.client.purge_results(jobs=jobs, targets=targets) | |||
@spin_after | |||
def get_result(self, indices_or_msg_ids=None): | |||
"""return one or more results, specified by history index or msg_id. | |||
MinRK
|
r3635 | ||
MinRK
|
r3639 | See client.get_result for details. | |
""" | |||
if indices_or_msg_ids is None: | |||
indices_or_msg_ids = -1 | |||
if isinstance(indices_or_msg_ids, int): | |||
indices_or_msg_ids = self.history[indices_or_msg_ids] | |||
elif isinstance(indices_or_msg_ids, (list,tuple,set)): | |||
indices_or_msg_ids = list(indices_or_msg_ids) | |||
for i,index in enumerate(indices_or_msg_ids): | |||
if isinstance(index, int): | |||
indices_or_msg_ids[i] = self.history[index] | |||
return self.client.get_result(indices_or_msg_ids) | |||
MinRK
|
r3635 | #------------------------------------------------------------------- | |
MinRK
|
r3636 | # Map | |
#------------------------------------------------------------------- | |||
def map(self, f, *sequences, **kwargs): | |||
"""override in subclasses""" | |||
raise NotImplementedError | |||
def map_async(self, f, *sequences, **kwargs): | |||
"""Parallel version of builtin `map`, using this view's engines. | |||
This is equivalent to map(...block=False) | |||
MinRK
|
r3639 | See `self.map` for details. | |
MinRK
|
r3636 | """ | |
if 'block' in kwargs: | |||
raise TypeError("map_async doesn't take a `block` keyword argument.") | |||
kwargs['block'] = False | |||
return self.map(f,*sequences,**kwargs) | |||
def map_sync(self, f, *sequences, **kwargs): | |||
"""Parallel version of builtin `map`, using this view's engines. | |||
This is equivalent to map(...block=True) | |||
MinRK
|
r3639 | See `self.map` for details. | |
MinRK
|
r3636 | """ | |
if 'block' in kwargs: | |||
raise TypeError("map_sync doesn't take a `block` keyword argument.") | |||
kwargs['block'] = True | |||
return self.map(f,*sequences,**kwargs) | |||
MinRK
|
r3639 | def imap(self, f, *sequences, **kwargs): | |
"""Parallel version of `itertools.imap`. | |||
See `self.map` for details. | |||
""" | |||
return iter(self.map_async(f,*sequences, **kwargs)) | |||
MinRK
|
r3636 | #------------------------------------------------------------------- | |
MinRK
|
r3635 | # Decorators | |
#------------------------------------------------------------------- | |||
MinRK
|
r3655 | def remote(self, bound=False, block=True): | |
MinRK
|
r3635 | """Decorator for making a RemoteFunction""" | |
MinRK
|
r3639 | return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) | |
MinRK
|
r3590 | ||
MinRK
|
r3655 | def parallel(self, dist='b', bound=False, block=None): | |
MinRK
|
r3636 | """Decorator for making a ParallelFunction""" | |
block = self.block if block is None else block | |||
MinRK
|
r3639 | return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) | |
MinRK
|
r3543 | ||
MinRK
|
r3641 | @testdec.skip_doctest | |
MinRK
|
r3539 | class DirectView(View): | |
MinRK
|
r3584 | """Direct Multiplexer View of one or more engines. | |
These are created via indexed access to a client: | |||
>>> dv_1 = client[1] | |||
>>> dv_all = client[:] | |||
>>> dv_even = client[::2] | |||
>>> dv_some = client[1:3] | |||
MinRK
|
r3624 | This object provides dictionary access to engine namespaces: | |
# push a=5: | |||
>>> dv['a'] = 5 | |||
# pull 'foo': | |||
>>> db['foo'] | |||
MinRK
|
r3590 | ||
MinRK
|
r3584 | """ | |
MinRK
|
r3539 | ||
MinRK
|
r3635 | def __init__(self, client=None, targets=None): | |
super(DirectView, self).__init__(client=client, targets=targets) | |||
self._balanced = False | |||
@spin_after | |||
@save_ids | |||
def map(self, f, *sequences, **kwargs): | |||
MinRK
|
r3639 | """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult | |
Parallel version of builtin `map`, using this View's `targets`. | |||
MinRK
|
r3635 | ||
There will be one task per target, so work will be chunked | |||
if the sequences are longer than `targets`. | |||
Results can be iterated as they are ready, but will become available in chunks. | |||
Parameters | |||
---------- | |||
f : callable | |||
function to be mapped | |||
*sequences: one or more sequences of matching length | |||
the sequences to be distributed and passed to `f` | |||
block : bool | |||
whether to wait for the result or not [default self.block] | |||
bound : bool | |||
MinRK
|
r3655 | whether to pass the client's Namespace as the first argument to `f` | |
MinRK
|
r3635 | ||
Returns | |||
------- | |||
if block=False: | |||
AsyncMapResult | |||
An object like AsyncResult, but which reassembles the sequence of results | |||
into a single list. AsyncMapResults can be iterated through before all | |||
results are complete. | |||
MinRK
|
r3639 | else: | |
list | |||
MinRK
|
r3635 | the result of map(f,*sequences) | |
""" | |||
block = kwargs.get('block', self.block) | |||
bound = kwargs.get('bound', self.bound) | |||
for k in kwargs.keys(): | |||
if k not in ['block', 'bound']: | |||
raise TypeError("invalid keyword arg, %r"%k) | |||
assert len(sequences) > 0, "must have some sequences to map onto!" | |||
MinRK
|
r3636 | pf = ParallelFunction(self.client, f, block=block, bound=bound, | |
MinRK
|
r3639 | targets=self._targets, balanced=False) | |
MinRK
|
r3635 | return pf.map(*sequences) | |
MinRK
|
r3590 | @sync_results | |
@save_ids | |||
MinRK
|
r3642 | def execute(self, code, block=None): | |
MinRK
|
r3590 | """execute some code on my targets.""" | |
MinRK
|
r3642 | ||
block = block if block is not None else self.block | |||
MinRK
|
r3639 | return self.client.execute(code, block=block, targets=self._targets) | |
MinRK
|
r3590 | ||
MinRK
|
r3642 | @sync_results | |
@save_ids | |||
def run(self, fname, block=None): | |||
"""execute the code in a file on my targets.""" | |||
block = block if block is not None else self.block | |||
return self.client.run(fname, block=block, targets=self._targets) | |||
MinRK
|
r3539 | def update(self, ns): | |
"""update remote namespace with dict `ns`""" | |||
MinRK
|
r3639 | return self.client.push(ns, targets=self._targets, block=self.block) | |
MinRK
|
r3539 | ||
MinRK
|
r3642 | def push(self, ns, block=None): | |
"""update remote namespace with dict `ns`""" | |||
block = block if block is not None else self.block | |||
return self.client.push(ns, targets=self._targets, block=block) | |||
MinRK
|
r3641 | ||
MinRK
|
r3539 | def get(self, key_s): | |
"""get object(s) by `key_s` from remote namespace | |||
will return one object if it is a key. | |||
It also takes a list of keys, and will return a list of objects.""" | |||
# block = block if block is not None else self.block | |||
MinRK
|
r3639 | return self.client.pull(key_s, block=True, targets=self._targets) | |
MinRK
|
r3539 | ||
MinRK
|
r3590 | @sync_results | |
@save_ids | |||
MinRK
|
r3559 | def pull(self, key_s, block=True): | |
"""get object(s) by `key_s` from remote namespace | |||
will return one object if it is a key. | |||
It also takes a list of keys, and will return a list of objects.""" | |||
block = block if block is not None else self.block | |||
MinRK
|
r3639 | return self.client.pull(key_s, block=block, targets=self._targets) | |
MinRK
|
r3539 | ||
MinRK
|
r3641 | def scatter(self, key, seq, dist='b', flatten=False, block=None): | |
MinRK
|
r3587 | """ | |
Partition a Python sequence and send the partitions to a set of engines. | |||
""" | |||
block = block if block is not None else self.block | |||
return self.client.scatter(key, seq, dist=dist, flatten=flatten, | |||
MinRK
|
r3641 | targets=self._targets, block=block) | |
MinRK
|
r3587 | ||
MinRK
|
r3590 | @sync_results | |
@save_ids | |||
MinRK
|
r3641 | def gather(self, key, dist='b', block=None): | |
MinRK
|
r3587 | """ | |
Gather a partitioned sequence on a set of engines as a single local seq. | |||
""" | |||
block = block if block is not None else self.block | |||
MinRK
|
r3641 | return self.client.gather(key, dist=dist, targets=self._targets, block=block) | |
MinRK
|
r3587 | ||
MinRK
|
r3539 | def __getitem__(self, key): | |
return self.get(key) | |||
MinRK
|
r3559 | def __setitem__(self,key, value): | |
MinRK
|
r3539 | self.update({key:value}) | |
MinRK
|
r3540 | def clear(self, block=False): | |
"""Clear the remote namespaces on my engines.""" | |||
block = block if block is not None else self.block | |||
MinRK
|
r3639 | return self.client.clear(targets=self._targets, block=block) | |
MinRK
|
r3540 | ||
def kill(self, block=True): | |||
"""Kill my engines.""" | |||
block = block if block is not None else self.block | |||
MinRK
|
r3639 | return self.client.kill(targets=self._targets, block=block) | |
MinRK
|
r3539 | ||
MinRK
|
r3590 | #---------------------------------------- | |
# activate for %px,%autopx magics | |||
#---------------------------------------- | |||
def activate(self): | |||
"""Make this `View` active for parallel magic commands. | |||
IPython has a magic command syntax to work with `MultiEngineClient` objects. | |||
In a given IPython session there is a single active one. While | |||
there can be many `Views` created and used by the user, | |||
there is only one active one. The active `View` is used whenever | |||
the magic commands %px and %autopx are used. | |||
The activate() method is called on a given `View` to make it | |||
active. Once this has been done, the magic commands can be used. | |||
""" | |||
try: | |||
# This is injected into __builtins__. | |||
ip = get_ipython() | |||
except NameError: | |||
print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." | |||
else: | |||
pmagic = ip.plugin_manager.get_plugin('parallelmagic') | |||
if pmagic is not None: | |||
pmagic.active_multiengine_client = self | |||
else: | |||
print "You must first load the parallelmagic extension " \ | |||
"by doing '%load_ext parallelmagic'" | |||
MinRK
|
r3641 | ||
@testdec.skip_doctest | |||
MinRK
|
r3539 | class LoadBalancedView(View): | |
MinRK
|
r3635 | """An load-balancing View that only executes via the Task scheduler. | |
MinRK
|
r3584 | ||
MinRK
|
r3635 | Load-balanced views can be created with the client's `view` method: | |
MinRK
|
r3584 | ||
MinRK
|
r3635 | >>> v = client.view(balanced=True) | |
MinRK
|
r3584 | ||
MinRK
|
r3635 | or targets can be specified, to restrict the potential destinations: | |
MinRK
|
r3584 | ||
MinRK
|
r3625 | >>> v = client.view([1,3],balanced=True) | |
which would restrict loadbalancing to between engines 1 and 3. | |||
MinRK
|
r3584 | ||
""" | |||
MinRK
|
r3559 | ||
MinRK
|
r3635 | _default_names = ['block', 'bound', 'follow', 'after', 'timeout'] | |
MinRK
|
r3625 | ||
MinRK
|
r3635 | def __init__(self, client=None, targets=None): | |
super(LoadBalancedView, self).__init__(client=client, targets=targets) | |||
MinRK
|
r3625 | self._ntargets = 1 | |
MinRK
|
r3636 | self._balanced = True | |
MinRK
|
r3635 | ||
def _validate_dependency(self, dep): | |||
"""validate a dependency. | |||
For use in `set_flags`. | |||
""" | |||
if dep is None or isinstance(dep, (str, AsyncResult, Dependency)): | |||
return True | |||
elif isinstance(dep, (list,set, tuple)): | |||
for d in dep: | |||
if not isinstance(d, str, AsyncResult): | |||
return False | |||
elif isinstance(dep, dict): | |||
if set(dep.keys()) != set(Dependency().as_dict().keys()): | |||
return False | |||
if not isinstance(dep['msg_ids'], list): | |||
return False | |||
for d in dep['msg_ids']: | |||
if not isinstance(d, str): | |||
return False | |||
else: | |||
return False | |||
def set_flags(self, **kwargs): | |||
"""set my attribute flags by keyword. | |||
MinRK
|
r3639 | A View is a wrapper for the Client's apply method, but with attributes | |
that specify keyword arguments, those attributes can be set by keyword | |||
argument with this method. | |||
MinRK
|
r3635 | ||
Parameters | |||
---------- | |||
block : bool | |||
whether to wait for results | |||
bound : bool | |||
MinRK
|
r3655 | whether to pass the client's Namespace as the first argument | |
to functions called via `apply`. | |||
track : bool | |||
whether to create a MessageTracker to allow the user to | |||
safely edit after arrays and buffers during non-copying | |||
sends. | |||
MinRK
|
r3635 | follow : Dependency, list, msg_id, AsyncResult | |
the location dependencies of tasks | |||
after : Dependency, list, msg_id, AsyncResult | |||
the time dependencies of tasks | |||
timeout : int,None | |||
the timeout to be used for tasks | |||
""" | |||
super(LoadBalancedView, self).set_flags(**kwargs) | |||
for name in ('follow', 'after'): | |||
if name in kwargs: | |||
value = kwargs[name] | |||
if self._validate_dependency(value): | |||
setattr(self, name, value) | |||
else: | |||
raise ValueError("Invalid dependency: %r"%value) | |||
if 'timeout' in kwargs: | |||
t = kwargs['timeout'] | |||
if not isinstance(t, (int, long, float, None)): | |||
raise TypeError("Invalid type for timeout: %r"%type(t)) | |||
if t is not None: | |||
if t < 0: | |||
raise ValueError("Invalid timeout: %s"%t) | |||
self.timeout = t | |||
@spin_after | |||
@save_ids | |||
def map(self, f, *sequences, **kwargs): | |||
MinRK
|
r3639 | """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult | |
Parallel version of builtin `map`, load-balanced by this View. | |||
MinRK
|
r3635 | ||
MinRK
|
r3639 | `block`, `bound`, and `chunk_size` can be specified by keyword only. | |
Each `chunk_size` elements will be a separate task, and will be | |||
load-balanced. This lets individual elements be available for iteration | |||
as soon as they arrive. | |||
MinRK
|
r3635 | ||
Parameters | |||
---------- | |||
f : callable | |||
function to be mapped | |||
*sequences: one or more sequences of matching length | |||
the sequences to be distributed and passed to `f` | |||
block : bool | |||
whether to wait for the result or not [default self.block] | |||
bound : bool | |||
MinRK
|
r3655 | whether to pass the client's Namespace as the first argument to `f` | |
track : bool | |||
whether to create a MessageTracker to allow the user to | |||
safely edit after arrays and buffers during non-copying | |||
sends. | |||
MinRK
|
r3639 | chunk_size : int | |
how many elements should be in each task [default 1] | |||
MinRK
|
r3635 | ||
Returns | |||
------- | |||
if block=False: | |||
AsyncMapResult | |||
An object like AsyncResult, but which reassembles the sequence of results | |||
into a single list. AsyncMapResults can be iterated through before all | |||
results are complete. | |||
else: | |||
the result of map(f,*sequences) | |||
""" | |||
MinRK
|
r3636 | # default | |
MinRK
|
r3635 | block = kwargs.get('block', self.block) | |
bound = kwargs.get('bound', self.bound) | |||
MinRK
|
r3636 | chunk_size = kwargs.get('chunk_size', 1) | |
keyset = set(kwargs.keys()) | |||
extra_keys = keyset.difference_update(set(['block', 'bound', 'chunk_size'])) | |||
if extra_keys: | |||
raise TypeError("Invalid kwargs: %s"%list(extra_keys)) | |||
MinRK
|
r3635 | ||
assert len(sequences) > 0, "must have some sequences to map onto!" | |||
pf = ParallelFunction(self.client, f, block=block, bound=bound, | |||
MinRK
|
r3639 | targets=self._targets, balanced=True, | |
MinRK
|
r3636 | chunk_size=chunk_size) | |
MinRK
|
r3635 | return pf.map(*sequences) | |
MinRK
|
r3644 | __all__ = ['LoadBalancedView', 'DirectView'] |