view.py
639 lines
| 22.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3584 | """Views of remote engines""" | ||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3641 | from IPython.testing import decorators as testdec | ||
MinRK
|
r3635 | from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance | ||
MinRK
|
r3539 | from IPython.external.decorator import decorator | ||
MinRK
|
r3635 | from IPython.zmq.parallel.asyncresult import AsyncResult | ||
from IPython.zmq.parallel.dependency import Dependency | ||||
MinRK
|
r3639 | from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel, remote | ||
MinRK
|
r3539 | |||
MinRK
|
r3584 | #----------------------------------------------------------------------------- | ||
# Decorators | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
@decorator | ||||
def myblock(f, self, *args, **kwargs): | ||||
MinRK
|
r3584 | """override client.block with self.block during a call""" | ||
MinRK
|
r3539 | block = self.client.block | ||
self.client.block = self.block | ||||
MinRK
|
r3590 | try: | ||
ret = f(self, *args, **kwargs) | ||||
finally: | ||||
self.client.block = block | ||||
MinRK
|
r3539 | return ret | ||
MinRK
|
r3543 | @decorator | ||
def save_ids(f, self, *args, **kwargs): | ||||
MinRK
|
r3584 | """Keep our history and outstanding attributes up to date after a method call.""" | ||
MinRK
|
r3588 | n_previous = len(self.client.history) | ||
MinRK
|
r3543 | ret = f(self, *args, **kwargs) | ||
MinRK
|
r3588 | nmsgs = len(self.client.history) - n_previous | ||
msg_ids = self.client.history[-nmsgs:] | ||||
MinRK
|
r3543 | self.history.extend(msg_ids) | ||
map(self.outstanding.add, msg_ids) | ||||
return ret | ||||
@decorator | ||||
def sync_results(f, self, *args, **kwargs): | ||||
MinRK
|
r3584 | """sync relevant results from self.client to our results attribute.""" | ||
MinRK
|
r3543 | ret = f(self, *args, **kwargs) | ||
delta = self.outstanding.difference(self.client.outstanding) | ||||
completed = self.outstanding.intersection(delta) | ||||
self.outstanding = self.outstanding.difference(completed) | ||||
for msg_id in completed: | ||||
self.results[msg_id] = self.client.results[msg_id] | ||||
return ret | ||||
@decorator | ||||
def spin_after(f, self, *args, **kwargs): | ||||
MinRK
|
r3584 | """call spin after the method.""" | ||
MinRK
|
r3543 | ret = f(self, *args, **kwargs) | ||
self.spin() | ||||
return ret | ||||
MinRK
|
r3584 | #----------------------------------------------------------------------------- | ||
# Classes | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3543 | |||
MinRK
|
r3635 | class View(HasTraits): | ||
MinRK
|
r3584 | """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. | ||
Don't use this class, use subclasses. | ||||
""" | ||||
MinRK
|
r3635 | block=Bool(False) | ||
bound=Bool(False) | ||||
history=List() | ||||
outstanding = Set() | ||||
results = Dict() | ||||
client = Instance('IPython.zmq.parallel.client.Client') | ||||
_ntargets = Int(1) | ||||
_balanced = Bool(False) | ||||
_default_names = List(['block', 'bound']) | ||||
MinRK
|
r3625 | _targets = None | ||
MinRK
|
r3539 | |||
MinRK
|
r3635 | def __init__(self, client=None, targets=None): | ||
super(View, self).__init__(client=client) | ||||
MinRK
|
r3539 | self._targets = targets | ||
MinRK
|
r3559 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) | ||
MinRK
|
r3539 | self.block = client.block | ||
MinRK
|
r3635 | |||
MinRK
|
r3625 | for name in self._default_names: | ||
setattr(self, name, getattr(self, name, None)) | ||||
MinRK
|
r3639 | assert not self.__class__ is View, "Don't use base View objects, use subclasses" | ||
MinRK
|
r3543 | |||
MinRK
|
r3539 | def __repr__(self): | ||
strtargets = str(self._targets) | ||||
if len(strtargets) > 16: | ||||
strtargets = strtargets[:12]+'...]' | ||||
return "<%s %s>"%(self.__class__.__name__, strtargets) | ||||
MinRK
|
r3543 | |||
MinRK
|
r3539 | @property | ||
def targets(self): | ||||
return self._targets | ||||
MinRK
|
r3543 | |||
MinRK
|
r3539 | @targets.setter | ||
def targets(self, value): | ||||
MinRK
|
r3635 | raise AttributeError("Cannot set View `targets` after construction!") | ||
MinRK
|
r3625 | |||
MinRK
|
r3639 | @property | ||
def balanced(self): | ||||
return self._balanced | ||||
@balanced.setter | ||||
def balanced(self, value): | ||||
raise AttributeError("Cannot set View `balanced` after construction!") | ||||
MinRK
|
r3625 | def _defaults(self, *excludes): | ||
"""return dict of our default attributes, excluding names given.""" | ||||
MinRK
|
r3639 | d = dict(balanced=self._balanced, targets=self._targets) | ||
MinRK
|
r3625 | for name in self._default_names: | ||
if name not in excludes: | ||||
d[name] = getattr(self, name) | ||||
return d | ||||
MinRK
|
r3635 | def set_flags(self, **kwargs): | ||
"""set my attribute flags by keyword. | ||||
A View is a wrapper for the Client's apply method, but | ||||
with attributes that specify keyword arguments, those attributes | ||||
can be set by keyword argument with this method. | ||||
Parameters | ||||
---------- | ||||
block : bool | ||||
whether to wait for results | ||||
bound : bool | ||||
whether to use the client's namespace | ||||
""" | ||||
for key in kwargs: | ||||
if key not in self._default_names: | ||||
raise KeyError("Invalid name: %r"%key) | ||||
for name in ('block', 'bound'): | ||||
if name in kwargs: | ||||
MinRK
|
r3636 | setattr(self, name, kwargs[name]) | ||
MinRK
|
r3635 | |||
#---------------------------------------------------------------- | ||||
# wrappers for client methods: | ||||
#---------------------------------------------------------------- | ||||
MinRK
|
r3543 | @sync_results | ||
def spin(self): | ||||
"""spin the client, and sync""" | ||||
self.client.spin() | ||||
MinRK
|
r3625 | |||
MinRK
|
r3543 | @sync_results | ||
@save_ids | ||||
MinRK
|
r3539 | def apply(self, f, *args, **kwargs): | ||
"""calls f(*args, **kwargs) on remote engines, returning the result. | ||||
This method does not involve the engine's namespace. | ||||
if self.block is False: | ||||
returns msg_id | ||||
else: | ||||
returns actual result of f(*args, **kwargs) | ||||
""" | ||||
MinRK
|
r3635 | return self.client.apply(f, args, kwargs, **self._defaults()) | ||
MinRK
|
r3543 | |||
@save_ids | ||||
MinRK
|
r3539 | def apply_async(self, f, *args, **kwargs): | ||
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner. | ||||
This method does not involve the engine's namespace. | ||||
returns msg_id | ||||
""" | ||||
MinRK
|
r3625 | d = self._defaults('block', 'bound') | ||
MinRK
|
r3635 | return self.client.apply(f,args,kwargs, block=False, bound=False, **d) | ||
MinRK
|
r3543 | |||
@spin_after | ||||
@save_ids | ||||
MinRK
|
r3539 | def apply_sync(self, f, *args, **kwargs): | ||
"""calls f(*args, **kwargs) on remote engines in a blocking manner, | ||||
returning the result. | ||||
This method does not involve the engine's namespace. | ||||
returns: actual result of f(*args, **kwargs) | ||||
""" | ||||
MinRK
|
r3625 | d = self._defaults('block', 'bound') | ||
MinRK
|
r3635 | return self.client.apply(f,args,kwargs, block=True, bound=False, **d) | ||
MinRK
|
r3543 | |||
MinRK
|
r3639 | # @sync_results | ||
# @save_ids | ||||
# def apply_bound(self, f, *args, **kwargs): | ||||
# """calls f(*args, **kwargs) bound to engine namespace(s). | ||||
# | ||||
# if self.block is False: | ||||
# returns msg_id | ||||
# else: | ||||
# returns actual result of f(*args, **kwargs) | ||||
# | ||||
# This method has access to the targets' namespace via globals() | ||||
# | ||||
# """ | ||||
# d = self._defaults('bound') | ||||
# return self.client.apply(f, args, kwargs, bound=True, **d) | ||||
# | ||||
MinRK
|
r3543 | @sync_results | ||
@save_ids | ||||
MinRK
|
r3539 | def apply_async_bound(self, f, *args, **kwargs): | ||
"""calls f(*args, **kwargs) bound to engine namespace(s) | ||||
in a nonblocking manner. | ||||
returns: msg_id | ||||
MinRK
|
r3639 | This method has access to the targets' namespace via globals() | ||
MinRK
|
r3539 | |||
""" | ||||
MinRK
|
r3625 | d = self._defaults('block', 'bound') | ||
MinRK
|
r3635 | return self.client.apply(f, args, kwargs, block=False, bound=True, **d) | ||
MinRK
|
r3543 | |||
MinRK
|
r3544 | @spin_after | ||
MinRK
|
r3543 | @save_ids | ||
MinRK
|
r3539 | def apply_sync_bound(self, f, *args, **kwargs): | ||
"""calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. | ||||
returns: actual result of f(*args, **kwargs) | ||||
MinRK
|
r3639 | This method has access to the targets' namespace via globals() | ||
MinRK
|
r3539 | |||
""" | ||||
MinRK
|
r3625 | d = self._defaults('block', 'bound') | ||
MinRK
|
r3635 | return self.client.apply(f, args, kwargs, block=True, bound=True, **d) | ||
MinRK
|
r3590 | |||
MinRK
|
r3639 | def abort(self, jobs=None, block=None): | ||
MinRK
|
r3559 | """Abort jobs on my engines. | ||
Parameters | ||||
---------- | ||||
MinRK
|
r3639 | jobs : None, str, list of strs, optional | ||
MinRK
|
r3559 | if None: abort all jobs. | ||
else: abort specific msg_id(s). | ||||
""" | ||||
block = block if block is not None else self.block | ||||
MinRK
|
r3639 | return self.client.abort(jobs=jobs, targets=self._targets, block=block) | ||
MinRK
|
r3559 | |||
def queue_status(self, verbose=False): | ||||
"""Fetch the Queue status of my engines""" | ||||
MinRK
|
r3639 | return self.client.queue_status(targets=self._targets, verbose=verbose) | ||
MinRK
|
r3559 | |||
MinRK
|
r3639 | def purge_results(self, jobs=[], targets=[]): | ||
MinRK
|
r3559 | """Instruct the controller to forget specific results.""" | ||
if targets is None or targets == 'all': | ||||
MinRK
|
r3639 | targets = self._targets | ||
return self.client.purge_results(jobs=jobs, targets=targets) | ||||
@spin_after | ||||
def get_result(self, indices_or_msg_ids=None): | ||||
"""return one or more results, specified by history index or msg_id. | ||||
MinRK
|
r3635 | |||
MinRK
|
r3639 | See client.get_result for details. | ||
""" | ||||
if indices_or_msg_ids is None: | ||||
indices_or_msg_ids = -1 | ||||
if isinstance(indices_or_msg_ids, int): | ||||
indices_or_msg_ids = self.history[indices_or_msg_ids] | ||||
elif isinstance(indices_or_msg_ids, (list,tuple,set)): | ||||
indices_or_msg_ids = list(indices_or_msg_ids) | ||||
for i,index in enumerate(indices_or_msg_ids): | ||||
if isinstance(index, int): | ||||
indices_or_msg_ids[i] = self.history[index] | ||||
return self.client.get_result(indices_or_msg_ids) | ||||
MinRK
|
r3635 | #------------------------------------------------------------------- | ||
MinRK
|
r3636 | # Map | ||
#------------------------------------------------------------------- | ||||
def map(self, f, *sequences, **kwargs): | ||||
"""override in subclasses""" | ||||
raise NotImplementedError | ||||
def map_async(self, f, *sequences, **kwargs): | ||||
"""Parallel version of builtin `map`, using this view's engines. | ||||
This is equivalent to map(...block=False) | ||||
MinRK
|
r3639 | See `self.map` for details. | ||
MinRK
|
r3636 | """ | ||
if 'block' in kwargs: | ||||
raise TypeError("map_async doesn't take a `block` keyword argument.") | ||||
kwargs['block'] = False | ||||
return self.map(f,*sequences,**kwargs) | ||||
def map_sync(self, f, *sequences, **kwargs): | ||||
"""Parallel version of builtin `map`, using this view's engines. | ||||
This is equivalent to map(...block=True) | ||||
MinRK
|
r3639 | See `self.map` for details. | ||
MinRK
|
r3636 | """ | ||
if 'block' in kwargs: | ||||
raise TypeError("map_sync doesn't take a `block` keyword argument.") | ||||
kwargs['block'] = True | ||||
return self.map(f,*sequences,**kwargs) | ||||
MinRK
|
r3639 | def imap(self, f, *sequences, **kwargs): | ||
"""Parallel version of `itertools.imap`. | ||||
See `self.map` for details. | ||||
""" | ||||
return iter(self.map_async(f,*sequences, **kwargs)) | ||||
MinRK
|
r3636 | #------------------------------------------------------------------- | ||
MinRK
|
r3635 | # Decorators | ||
#------------------------------------------------------------------- | ||||
def remote(self, bound=True, block=True): | ||||
"""Decorator for making a RemoteFunction""" | ||||
MinRK
|
r3639 | return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) | ||
MinRK
|
r3590 | |||
MinRK
|
r3636 | def parallel(self, dist='b', bound=True, block=None): | ||
"""Decorator for making a ParallelFunction""" | ||||
block = self.block if block is None else block | ||||
MinRK
|
r3639 | return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) | ||
MinRK
|
r3543 | |||
MinRK
|
r3641 | @testdec.skip_doctest | ||
MinRK
|
r3539 | class DirectView(View): | ||
MinRK
|
r3584 | """Direct Multiplexer View of one or more engines. | ||
These are created via indexed access to a client: | ||||
>>> dv_1 = client[1] | ||||
>>> dv_all = client[:] | ||||
>>> dv_even = client[::2] | ||||
>>> dv_some = client[1:3] | ||||
MinRK
|
r3624 | This object provides dictionary access to engine namespaces: | ||
# push a=5: | ||||
>>> dv['a'] = 5 | ||||
# pull 'foo': | ||||
>>> db['foo'] | ||||
MinRK
|
r3590 | |||
MinRK
|
r3584 | """ | ||
MinRK
|
r3539 | |||
MinRK
|
r3635 | def __init__(self, client=None, targets=None): | ||
super(DirectView, self).__init__(client=client, targets=targets) | ||||
self._balanced = False | ||||
@spin_after | ||||
@save_ids | ||||
def map(self, f, *sequences, **kwargs): | ||||
MinRK
|
r3639 | """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult | ||
Parallel version of builtin `map`, using this View's `targets`. | ||||
MinRK
|
r3635 | |||
There will be one task per target, so work will be chunked | ||||
if the sequences are longer than `targets`. | ||||
Results can be iterated as they are ready, but will become available in chunks. | ||||
Parameters | ||||
---------- | ||||
f : callable | ||||
function to be mapped | ||||
*sequences: one or more sequences of matching length | ||||
the sequences to be distributed and passed to `f` | ||||
block : bool | ||||
whether to wait for the result or not [default self.block] | ||||
bound : bool | ||||
MinRK
|
r3639 | whether to have access to the engines' namespaces [default self.bound] | ||
MinRK
|
r3635 | |||
Returns | ||||
------- | ||||
if block=False: | ||||
AsyncMapResult | ||||
An object like AsyncResult, but which reassembles the sequence of results | ||||
into a single list. AsyncMapResults can be iterated through before all | ||||
results are complete. | ||||
MinRK
|
r3639 | else: | ||
list | ||||
MinRK
|
r3635 | the result of map(f,*sequences) | ||
""" | ||||
block = kwargs.get('block', self.block) | ||||
bound = kwargs.get('bound', self.bound) | ||||
for k in kwargs.keys(): | ||||
if k not in ['block', 'bound']: | ||||
raise TypeError("invalid keyword arg, %r"%k) | ||||
assert len(sequences) > 0, "must have some sequences to map onto!" | ||||
MinRK
|
r3636 | pf = ParallelFunction(self.client, f, block=block, bound=bound, | ||
MinRK
|
r3639 | targets=self._targets, balanced=False) | ||
MinRK
|
r3635 | return pf.map(*sequences) | ||
MinRK
|
r3590 | @sync_results | ||
@save_ids | ||||
def execute(self, code, block=True): | ||||
"""execute some code on my targets.""" | ||||
MinRK
|
r3639 | return self.client.execute(code, block=block, targets=self._targets) | ||
MinRK
|
r3590 | |||
MinRK
|
r3539 | def update(self, ns): | ||
"""update remote namespace with dict `ns`""" | ||||
MinRK
|
r3639 | return self.client.push(ns, targets=self._targets, block=self.block) | ||
MinRK
|
r3539 | |||
MinRK
|
r3559 | push = update | ||
MinRK
|
r3641 | |||
MinRK
|
r3539 | def get(self, key_s): | ||
"""get object(s) by `key_s` from remote namespace | ||||
will return one object if it is a key. | ||||
It also takes a list of keys, and will return a list of objects.""" | ||||
# block = block if block is not None else self.block | ||||
MinRK
|
r3639 | return self.client.pull(key_s, block=True, targets=self._targets) | ||
MinRK
|
r3539 | |||
MinRK
|
r3590 | @sync_results | ||
@save_ids | ||||
MinRK
|
r3559 | def pull(self, key_s, block=True): | ||
"""get object(s) by `key_s` from remote namespace | ||||
will return one object if it is a key. | ||||
It also takes a list of keys, and will return a list of objects.""" | ||||
block = block if block is not None else self.block | ||||
MinRK
|
r3639 | return self.client.pull(key_s, block=block, targets=self._targets) | ||
MinRK
|
r3539 | |||
MinRK
|
r3641 | def scatter(self, key, seq, dist='b', flatten=False, block=None): | ||
MinRK
|
r3587 | """ | ||
Partition a Python sequence and send the partitions to a set of engines. | ||||
""" | ||||
block = block if block is not None else self.block | ||||
return self.client.scatter(key, seq, dist=dist, flatten=flatten, | ||||
MinRK
|
r3641 | targets=self._targets, block=block) | ||
MinRK
|
r3587 | |||
MinRK
|
r3590 | @sync_results | ||
@save_ids | ||||
MinRK
|
r3641 | def gather(self, key, dist='b', block=None): | ||
MinRK
|
r3587 | """ | ||
Gather a partitioned sequence on a set of engines as a single local seq. | ||||
""" | ||||
block = block if block is not None else self.block | ||||
MinRK
|
r3641 | return self.client.gather(key, dist=dist, targets=self._targets, block=block) | ||
MinRK
|
r3587 | |||
MinRK
|
r3539 | def __getitem__(self, key): | ||
return self.get(key) | ||||
MinRK
|
r3559 | def __setitem__(self,key, value): | ||
MinRK
|
r3539 | self.update({key:value}) | ||
MinRK
|
r3540 | def clear(self, block=False): | ||
"""Clear the remote namespaces on my engines.""" | ||||
block = block if block is not None else self.block | ||||
MinRK
|
r3639 | return self.client.clear(targets=self._targets, block=block) | ||
MinRK
|
r3540 | |||
def kill(self, block=True): | ||||
"""Kill my engines.""" | ||||
block = block if block is not None else self.block | ||||
MinRK
|
r3639 | return self.client.kill(targets=self._targets, block=block) | ||
MinRK
|
r3539 | |||
MinRK
|
r3590 | #---------------------------------------- | ||
# activate for %px,%autopx magics | ||||
#---------------------------------------- | ||||
def activate(self): | ||||
"""Make this `View` active for parallel magic commands. | ||||
IPython has a magic command syntax to work with `MultiEngineClient` objects. | ||||
In a given IPython session there is a single active one. While | ||||
there can be many `Views` created and used by the user, | ||||
there is only one active one. The active `View` is used whenever | ||||
the magic commands %px and %autopx are used. | ||||
The activate() method is called on a given `View` to make it | ||||
active. Once this has been done, the magic commands can be used. | ||||
""" | ||||
try: | ||||
# This is injected into __builtins__. | ||||
ip = get_ipython() | ||||
except NameError: | ||||
print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." | ||||
else: | ||||
pmagic = ip.plugin_manager.get_plugin('parallelmagic') | ||||
if pmagic is not None: | ||||
pmagic.active_multiengine_client = self | ||||
else: | ||||
print "You must first load the parallelmagic extension " \ | ||||
"by doing '%load_ext parallelmagic'" | ||||
MinRK
|
r3641 | |||
@testdec.skip_doctest | ||||
MinRK
|
r3539 | class LoadBalancedView(View): | ||
MinRK
|
r3635 | """An load-balancing View that only executes via the Task scheduler. | ||
MinRK
|
r3584 | |||
MinRK
|
r3635 | Load-balanced views can be created with the client's `view` method: | ||
MinRK
|
r3584 | |||
MinRK
|
r3635 | >>> v = client.view(balanced=True) | ||
MinRK
|
r3584 | |||
MinRK
|
r3635 | or targets can be specified, to restrict the potential destinations: | ||
MinRK
|
r3584 | |||
MinRK
|
r3625 | >>> v = client.view([1,3],balanced=True) | ||
which would restrict loadbalancing to between engines 1 and 3. | ||||
MinRK
|
r3584 | |||
""" | ||||
MinRK
|
r3559 | |||
MinRK
|
r3635 | _default_names = ['block', 'bound', 'follow', 'after', 'timeout'] | ||
MinRK
|
r3625 | |||
MinRK
|
r3635 | def __init__(self, client=None, targets=None): | ||
super(LoadBalancedView, self).__init__(client=client, targets=targets) | ||||
MinRK
|
r3625 | self._ntargets = 1 | ||
MinRK
|
r3636 | self._balanced = True | ||
MinRK
|
r3635 | |||
def _validate_dependency(self, dep): | ||||
"""validate a dependency. | ||||
For use in `set_flags`. | ||||
""" | ||||
if dep is None or isinstance(dep, (str, AsyncResult, Dependency)): | ||||
return True | ||||
elif isinstance(dep, (list,set, tuple)): | ||||
for d in dep: | ||||
if not isinstance(d, str, AsyncResult): | ||||
return False | ||||
elif isinstance(dep, dict): | ||||
if set(dep.keys()) != set(Dependency().as_dict().keys()): | ||||
return False | ||||
if not isinstance(dep['msg_ids'], list): | ||||
return False | ||||
for d in dep['msg_ids']: | ||||
if not isinstance(d, str): | ||||
return False | ||||
else: | ||||
return False | ||||
def set_flags(self, **kwargs): | ||||
"""set my attribute flags by keyword. | ||||
MinRK
|
r3639 | A View is a wrapper for the Client's apply method, but with attributes | ||
that specify keyword arguments, those attributes can be set by keyword | ||||
argument with this method. | ||||
MinRK
|
r3635 | |||
Parameters | ||||
---------- | ||||
block : bool | ||||
whether to wait for results | ||||
bound : bool | ||||
whether to use the engine's namespace | ||||
follow : Dependency, list, msg_id, AsyncResult | ||||
the location dependencies of tasks | ||||
after : Dependency, list, msg_id, AsyncResult | ||||
the time dependencies of tasks | ||||
timeout : int,None | ||||
the timeout to be used for tasks | ||||
""" | ||||
super(LoadBalancedView, self).set_flags(**kwargs) | ||||
for name in ('follow', 'after'): | ||||
if name in kwargs: | ||||
value = kwargs[name] | ||||
if self._validate_dependency(value): | ||||
setattr(self, name, value) | ||||
else: | ||||
raise ValueError("Invalid dependency: %r"%value) | ||||
if 'timeout' in kwargs: | ||||
t = kwargs['timeout'] | ||||
if not isinstance(t, (int, long, float, None)): | ||||
raise TypeError("Invalid type for timeout: %r"%type(t)) | ||||
if t is not None: | ||||
if t < 0: | ||||
raise ValueError("Invalid timeout: %s"%t) | ||||
self.timeout = t | ||||
@spin_after | ||||
@save_ids | ||||
def map(self, f, *sequences, **kwargs): | ||||
MinRK
|
r3639 | """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult | ||
Parallel version of builtin `map`, load-balanced by this View. | ||||
MinRK
|
r3635 | |||
MinRK
|
r3639 | `block`, `bound`, and `chunk_size` can be specified by keyword only. | ||
Each `chunk_size` elements will be a separate task, and will be | ||||
load-balanced. This lets individual elements be available for iteration | ||||
as soon as they arrive. | ||||
MinRK
|
r3635 | |||
Parameters | ||||
---------- | ||||
f : callable | ||||
function to be mapped | ||||
*sequences: one or more sequences of matching length | ||||
the sequences to be distributed and passed to `f` | ||||
block : bool | ||||
whether to wait for the result or not [default self.block] | ||||
bound : bool | ||||
MinRK
|
r3639 | whether to use the engine's namespace [default self.bound] | ||
chunk_size : int | ||||
how many elements should be in each task [default 1] | ||||
MinRK
|
r3635 | |||
Returns | ||||
------- | ||||
if block=False: | ||||
AsyncMapResult | ||||
An object like AsyncResult, but which reassembles the sequence of results | ||||
into a single list. AsyncMapResults can be iterated through before all | ||||
results are complete. | ||||
else: | ||||
the result of map(f,*sequences) | ||||
""" | ||||
MinRK
|
r3636 | # default | ||
MinRK
|
r3635 | block = kwargs.get('block', self.block) | ||
bound = kwargs.get('bound', self.bound) | ||||
MinRK
|
r3636 | chunk_size = kwargs.get('chunk_size', 1) | ||
keyset = set(kwargs.keys()) | ||||
extra_keys = keyset.difference_update(set(['block', 'bound', 'chunk_size'])) | ||||
if extra_keys: | ||||
raise TypeError("Invalid kwargs: %s"%list(extra_keys)) | ||||
MinRK
|
r3635 | |||
assert len(sequences) > 0, "must have some sequences to map onto!" | ||||
pf = ParallelFunction(self.client, f, block=block, bound=bound, | ||||
MinRK
|
r3639 | targets=self._targets, balanced=True, | ||
MinRK
|
r3636 | chunk_size=chunk_size) | ||
MinRK
|
r3635 | return pf.map(*sequences) | ||