##// END OF EJS Templates
launcher updates for PBS
launcher updates for PBS

File last commit:

r3644:f36800d2
r3645:8a8cbf5d
Show More
view.py
657 lines | 23.1 KiB | text/x-python | PythonLexer
MinRK
cleanup pass
r3644 """Views of remote engines."""
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
testing fixes
r3641 from IPython.testing import decorators as testdec
MinRK
cleanup pass
r3644 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance
MinRK
API update involving map and load-balancing
r3635
MinRK
prep newparallel for rebase...
r3539 from IPython.external.decorator import decorator
MinRK
eliminate relative imports
r3642
from .asyncresult import AsyncResult
from .dependency import Dependency
from .remotefunction import ParallelFunction, parallel, remote
MinRK
prep newparallel for rebase...
r3539
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
# Decorators
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
@decorator
def myblock(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """override client.block with self.block during a call"""
MinRK
prep newparallel for rebase...
r3539 block = self.client.block
self.client.block = self.block
MinRK
tweaks related to docs + add activate() for magics
r3590 try:
ret = f(self, *args, **kwargs)
finally:
self.client.block = block
MinRK
prep newparallel for rebase...
r3539 return ret
MinRK
view decorators for syncing history/results
r3543 @decorator
def save_ids(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """Keep our history and outstanding attributes up to date after a method call."""
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 n_previous = len(self.client.history)
MinRK
view decorators for syncing history/results
r3543 ret = f(self, *args, **kwargs)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 nmsgs = len(self.client.history) - n_previous
msg_ids = self.client.history[-nmsgs:]
MinRK
view decorators for syncing history/results
r3543 self.history.extend(msg_ids)
map(self.outstanding.add, msg_ids)
return ret
@decorator
def sync_results(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """sync relevant results from self.client to our results attribute."""
MinRK
view decorators for syncing history/results
r3543 ret = f(self, *args, **kwargs)
delta = self.outstanding.difference(self.client.outstanding)
completed = self.outstanding.intersection(delta)
self.outstanding = self.outstanding.difference(completed)
for msg_id in completed:
self.results[msg_id] = self.client.results[msg_id]
return ret
@decorator
def spin_after(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """call spin after the method."""
MinRK
view decorators for syncing history/results
r3543 ret = f(self, *args, **kwargs)
self.spin()
return ret
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
MinRK
view decorators for syncing history/results
r3543
MinRK
API update involving map and load-balancing
r3635 class View(HasTraits):
MinRK
some docstring cleanup
r3584 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
Don't use this class, use subclasses.
"""
MinRK
API update involving map and load-balancing
r3635 block=Bool(False)
bound=Bool(False)
history=List()
outstanding = Set()
results = Dict()
client = Instance('IPython.zmq.parallel.client.Client')
_ntargets = Int(1)
_balanced = Bool(False)
_default_names = List(['block', 'bound'])
MinRK
cleanup pass
r3644 _targets = Any()
MinRK
prep newparallel for rebase...
r3539
MinRK
API update involving map and load-balancing
r3635 def __init__(self, client=None, targets=None):
super(View, self).__init__(client=client)
MinRK
prep newparallel for rebase...
r3539 self._targets = targets
MinRK
basic LoadBalancedView, RemoteFunction
r3559 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
MinRK
prep newparallel for rebase...
r3539 self.block = client.block
MinRK
API update involving map and load-balancing
r3635
MinRK
allow load-balancing across subsets of engines
r3625 for name in self._default_names:
setattr(self, name, getattr(self, name, None))
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539 def __repr__(self):
strtargets = str(self._targets)
if len(strtargets) > 16:
strtargets = strtargets[:12]+'...]'
return "<%s %s>"%(self.__class__.__name__, strtargets)
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539 @property
def targets(self):
return self._targets
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539 @targets.setter
def targets(self, value):
MinRK
API update involving map and load-balancing
r3635 raise AttributeError("Cannot set View `targets` after construction!")
MinRK
allow load-balancing across subsets of engines
r3625
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 @property
def balanced(self):
return self._balanced
@balanced.setter
def balanced(self, value):
raise AttributeError("Cannot set View `balanced` after construction!")
MinRK
allow load-balancing across subsets of engines
r3625 def _defaults(self, *excludes):
"""return dict of our default attributes, excluding names given."""
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 d = dict(balanced=self._balanced, targets=self._targets)
MinRK
allow load-balancing across subsets of engines
r3625 for name in self._default_names:
if name not in excludes:
d[name] = getattr(self, name)
return d
MinRK
API update involving map and load-balancing
r3635 def set_flags(self, **kwargs):
"""set my attribute flags by keyword.
A View is a wrapper for the Client's apply method, but
with attributes that specify keyword arguments, those attributes
can be set by keyword argument with this method.
Parameters
----------
block : bool
whether to wait for results
bound : bool
whether to use the client's namespace
"""
for key in kwargs:
if key not in self._default_names:
raise KeyError("Invalid name: %r"%key)
for name in ('block', 'bound'):
if name in kwargs:
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 setattr(self, name, kwargs[name])
MinRK
API update involving map and load-balancing
r3635
#----------------------------------------------------------------
# wrappers for client methods:
#----------------------------------------------------------------
MinRK
view decorators for syncing history/results
r3543 @sync_results
def spin(self):
"""spin the client, and sync"""
self.client.spin()
MinRK
allow load-balancing across subsets of engines
r3625
MinRK
view decorators for syncing history/results
r3543 @sync_results
@save_ids
MinRK
prep newparallel for rebase...
r3539 def apply(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines, returning the result.
This method does not involve the engine's namespace.
if self.block is False:
returns msg_id
else:
returns actual result of f(*args, **kwargs)
"""
MinRK
API update involving map and load-balancing
r3635 return self.client.apply(f, args, kwargs, **self._defaults())
MinRK
view decorators for syncing history/results
r3543
@save_ids
MinRK
prep newparallel for rebase...
r3539 def apply_async(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner.
This method does not involve the engine's namespace.
returns msg_id
"""
MinRK
allow load-balancing across subsets of engines
r3625 d = self._defaults('block', 'bound')
MinRK
API update involving map and load-balancing
r3635 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
MinRK
view decorators for syncing history/results
r3543
@spin_after
@save_ids
MinRK
prep newparallel for rebase...
r3539 def apply_sync(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a blocking manner,
returning the result.
This method does not involve the engine's namespace.
returns: actual result of f(*args, **kwargs)
"""
MinRK
allow load-balancing across subsets of engines
r3625 d = self._defaults('block', 'bound')
MinRK
API update involving map and load-balancing
r3635 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
MinRK
view decorators for syncing history/results
r3543
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 # @sync_results
# @save_ids
# def apply_bound(self, f, *args, **kwargs):
# """calls f(*args, **kwargs) bound to engine namespace(s).
#
# if self.block is False:
# returns msg_id
# else:
# returns actual result of f(*args, **kwargs)
#
# This method has access to the targets' namespace via globals()
#
# """
# d = self._defaults('bound')
# return self.client.apply(f, args, kwargs, bound=True, **d)
#
MinRK
view decorators for syncing history/results
r3543 @sync_results
@save_ids
MinRK
prep newparallel for rebase...
r3539 def apply_async_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s)
in a nonblocking manner.
returns: msg_id
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 This method has access to the targets' namespace via globals()
MinRK
prep newparallel for rebase...
r3539
"""
MinRK
allow load-balancing across subsets of engines
r3625 d = self._defaults('block', 'bound')
MinRK
API update involving map and load-balancing
r3635 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
MinRK
view decorators for syncing history/results
r3543
MinRK
whitespace
r3544 @spin_after
MinRK
view decorators for syncing history/results
r3543 @save_ids
MinRK
prep newparallel for rebase...
r3539 def apply_sync_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
returns: actual result of f(*args, **kwargs)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 This method has access to the targets' namespace via globals()
MinRK
prep newparallel for rebase...
r3539
"""
MinRK
allow load-balancing across subsets of engines
r3625 d = self._defaults('block', 'bound')
MinRK
API update involving map and load-balancing
r3635 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
MinRK
tweaks related to docs + add activate() for magics
r3590
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def abort(self, jobs=None, block=None):
MinRK
basic LoadBalancedView, RemoteFunction
r3559 """Abort jobs on my engines.
Parameters
----------
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 jobs : None, str, list of strs, optional
MinRK
basic LoadBalancedView, RemoteFunction
r3559 if None: abort all jobs.
else: abort specific msg_id(s).
"""
block = block if block is not None else self.block
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return self.client.abort(jobs=jobs, targets=self._targets, block=block)
MinRK
basic LoadBalancedView, RemoteFunction
r3559
def queue_status(self, verbose=False):
"""Fetch the Queue status of my engines"""
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return self.client.queue_status(targets=self._targets, verbose=verbose)
MinRK
basic LoadBalancedView, RemoteFunction
r3559
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def purge_results(self, jobs=[], targets=[]):
MinRK
basic LoadBalancedView, RemoteFunction
r3559 """Instruct the controller to forget specific results."""
if targets is None or targets == 'all':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 targets = self._targets
return self.client.purge_results(jobs=jobs, targets=targets)
@spin_after
def get_result(self, indices_or_msg_ids=None):
"""return one or more results, specified by history index or msg_id.
MinRK
API update involving map and load-balancing
r3635
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 See client.get_result for details.
"""
if indices_or_msg_ids is None:
indices_or_msg_ids = -1
if isinstance(indices_or_msg_ids, int):
indices_or_msg_ids = self.history[indices_or_msg_ids]
elif isinstance(indices_or_msg_ids, (list,tuple,set)):
indices_or_msg_ids = list(indices_or_msg_ids)
for i,index in enumerate(indices_or_msg_ids):
if isinstance(index, int):
indices_or_msg_ids[i] = self.history[index]
return self.client.get_result(indices_or_msg_ids)
MinRK
API update involving map and load-balancing
r3635 #-------------------------------------------------------------------
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 # Map
#-------------------------------------------------------------------
def map(self, f, *sequences, **kwargs):
"""override in subclasses"""
raise NotImplementedError
def map_async(self, f, *sequences, **kwargs):
"""Parallel version of builtin `map`, using this view's engines.
This is equivalent to map(...block=False)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 See `self.map` for details.
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 """
if 'block' in kwargs:
raise TypeError("map_async doesn't take a `block` keyword argument.")
kwargs['block'] = False
return self.map(f,*sequences,**kwargs)
def map_sync(self, f, *sequences, **kwargs):
"""Parallel version of builtin `map`, using this view's engines.
This is equivalent to map(...block=True)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 See `self.map` for details.
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 """
if 'block' in kwargs:
raise TypeError("map_sync doesn't take a `block` keyword argument.")
kwargs['block'] = True
return self.map(f,*sequences,**kwargs)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def imap(self, f, *sequences, **kwargs):
"""Parallel version of `itertools.imap`.
See `self.map` for details.
"""
return iter(self.map_async(f,*sequences, **kwargs))
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 #-------------------------------------------------------------------
MinRK
API update involving map and load-balancing
r3635 # Decorators
#-------------------------------------------------------------------
def remote(self, bound=True, block=True):
"""Decorator for making a RemoteFunction"""
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
MinRK
tweaks related to docs + add activate() for magics
r3590
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 def parallel(self, dist='b', bound=True, block=None):
"""Decorator for making a ParallelFunction"""
block = self.block if block is None else block
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
MinRK
view decorators for syncing history/results
r3543
MinRK
testing fixes
r3641 @testdec.skip_doctest
MinRK
prep newparallel for rebase...
r3539 class DirectView(View):
MinRK
some docstring cleanup
r3584 """Direct Multiplexer View of one or more engines.
These are created via indexed access to a client:
>>> dv_1 = client[1]
>>> dv_all = client[:]
>>> dv_even = client[::2]
>>> dv_some = client[1:3]
MinRK
dependency tweaks + dependency/scheduler docs
r3624 This object provides dictionary access to engine namespaces:
# push a=5:
>>> dv['a'] = 5
# pull 'foo':
>>> db['foo']
MinRK
tweaks related to docs + add activate() for magics
r3590
MinRK
some docstring cleanup
r3584 """
MinRK
prep newparallel for rebase...
r3539
MinRK
API update involving map and load-balancing
r3635 def __init__(self, client=None, targets=None):
super(DirectView, self).__init__(client=client, targets=targets)
self._balanced = False
@spin_after
@save_ids
def map(self, f, *sequences, **kwargs):
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult
Parallel version of builtin `map`, using this View's `targets`.
MinRK
API update involving map and load-balancing
r3635
There will be one task per target, so work will be chunked
if the sequences are longer than `targets`.
Results can be iterated as they are ready, but will become available in chunks.
Parameters
----------
f : callable
function to be mapped
*sequences: one or more sequences of matching length
the sequences to be distributed and passed to `f`
block : bool
whether to wait for the result or not [default self.block]
bound : bool
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 whether to have access to the engines' namespaces [default self.bound]
MinRK
API update involving map and load-balancing
r3635
Returns
-------
if block=False:
AsyncMapResult
An object like AsyncResult, but which reassembles the sequence of results
into a single list. AsyncMapResults can be iterated through before all
results are complete.
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 else:
list
MinRK
API update involving map and load-balancing
r3635 the result of map(f,*sequences)
"""
block = kwargs.get('block', self.block)
bound = kwargs.get('bound', self.bound)
for k in kwargs.keys():
if k not in ['block', 'bound']:
raise TypeError("invalid keyword arg, %r"%k)
assert len(sequences) > 0, "must have some sequences to map onto!"
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 pf = ParallelFunction(self.client, f, block=block, bound=bound,
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 targets=self._targets, balanced=False)
MinRK
API update involving map and load-balancing
r3635 return pf.map(*sequences)
MinRK
tweaks related to docs + add activate() for magics
r3590 @sync_results
@save_ids
MinRK
eliminate relative imports
r3642 def execute(self, code, block=None):
MinRK
tweaks related to docs + add activate() for magics
r3590 """execute some code on my targets."""
MinRK
eliminate relative imports
r3642
block = block if block is not None else self.block
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return self.client.execute(code, block=block, targets=self._targets)
MinRK
tweaks related to docs + add activate() for magics
r3590
MinRK
eliminate relative imports
r3642 @sync_results
@save_ids
def run(self, fname, block=None):
"""execute the code in a file on my targets."""
block = block if block is not None else self.block
return self.client.run(fname, block=block, targets=self._targets)
MinRK
prep newparallel for rebase...
r3539 def update(self, ns):
"""update remote namespace with dict `ns`"""
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return self.client.push(ns, targets=self._targets, block=self.block)
MinRK
prep newparallel for rebase...
r3539
MinRK
eliminate relative imports
r3642 def push(self, ns, block=None):
"""update remote namespace with dict `ns`"""
block = block if block is not None else self.block
return self.client.push(ns, targets=self._targets, block=block)
MinRK
testing fixes
r3641
MinRK
prep newparallel for rebase...
r3539 def get(self, key_s):
"""get object(s) by `key_s` from remote namespace
will return one object if it is a key.
It also takes a list of keys, and will return a list of objects."""
# block = block if block is not None else self.block
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return self.client.pull(key_s, block=True, targets=self._targets)
MinRK
prep newparallel for rebase...
r3539
MinRK
tweaks related to docs + add activate() for magics
r3590 @sync_results
@save_ids
MinRK
basic LoadBalancedView, RemoteFunction
r3559 def pull(self, key_s, block=True):
"""get object(s) by `key_s` from remote namespace
will return one object if it is a key.
It also takes a list of keys, and will return a list of objects."""
block = block if block is not None else self.block
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return self.client.pull(key_s, block=block, targets=self._targets)
MinRK
prep newparallel for rebase...
r3539
MinRK
testing fixes
r3641 def scatter(self, key, seq, dist='b', flatten=False, block=None):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """
Partition a Python sequence and send the partitions to a set of engines.
"""
block = block if block is not None else self.block
return self.client.scatter(key, seq, dist=dist, flatten=flatten,
MinRK
testing fixes
r3641 targets=self._targets, block=block)
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
tweaks related to docs + add activate() for magics
r3590 @sync_results
@save_ids
MinRK
testing fixes
r3641 def gather(self, key, dist='b', block=None):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """
Gather a partitioned sequence on a set of engines as a single local seq.
"""
block = block if block is not None else self.block
MinRK
testing fixes
r3641 return self.client.gather(key, dist=dist, targets=self._targets, block=block)
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
prep newparallel for rebase...
r3539 def __getitem__(self, key):
return self.get(key)
MinRK
basic LoadBalancedView, RemoteFunction
r3559 def __setitem__(self,key, value):
MinRK
prep newparallel for rebase...
r3539 self.update({key:value})
MinRK
control channel progress
r3540 def clear(self, block=False):
"""Clear the remote namespaces on my engines."""
block = block if block is not None else self.block
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return self.client.clear(targets=self._targets, block=block)
MinRK
control channel progress
r3540
def kill(self, block=True):
"""Kill my engines."""
block = block if block is not None else self.block
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return self.client.kill(targets=self._targets, block=block)
MinRK
prep newparallel for rebase...
r3539
MinRK
tweaks related to docs + add activate() for magics
r3590 #----------------------------------------
# activate for %px,%autopx magics
#----------------------------------------
def activate(self):
"""Make this `View` active for parallel magic commands.
IPython has a magic command syntax to work with `MultiEngineClient` objects.
In a given IPython session there is a single active one. While
there can be many `Views` created and used by the user,
there is only one active one. The active `View` is used whenever
the magic commands %px and %autopx are used.
The activate() method is called on a given `View` to make it
active. Once this has been done, the magic commands can be used.
"""
try:
# This is injected into __builtins__.
ip = get_ipython()
except NameError:
print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
else:
pmagic = ip.plugin_manager.get_plugin('parallelmagic')
if pmagic is not None:
pmagic.active_multiengine_client = self
else:
print "You must first load the parallelmagic extension " \
"by doing '%load_ext parallelmagic'"
MinRK
testing fixes
r3641
@testdec.skip_doctest
MinRK
prep newparallel for rebase...
r3539 class LoadBalancedView(View):
MinRK
API update involving map and load-balancing
r3635 """An load-balancing View that only executes via the Task scheduler.
MinRK
some docstring cleanup
r3584
MinRK
API update involving map and load-balancing
r3635 Load-balanced views can be created with the client's `view` method:
MinRK
some docstring cleanup
r3584
MinRK
API update involving map and load-balancing
r3635 >>> v = client.view(balanced=True)
MinRK
some docstring cleanup
r3584
MinRK
API update involving map and load-balancing
r3635 or targets can be specified, to restrict the potential destinations:
MinRK
some docstring cleanup
r3584
MinRK
allow load-balancing across subsets of engines
r3625 >>> v = client.view([1,3],balanced=True)
which would restrict loadbalancing to between engines 1 and 3.
MinRK
some docstring cleanup
r3584
"""
MinRK
basic LoadBalancedView, RemoteFunction
r3559
MinRK
API update involving map and load-balancing
r3635 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
MinRK
allow load-balancing across subsets of engines
r3625
MinRK
API update involving map and load-balancing
r3635 def __init__(self, client=None, targets=None):
super(LoadBalancedView, self).__init__(client=client, targets=targets)
MinRK
allow load-balancing across subsets of engines
r3625 self._ntargets = 1
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 self._balanced = True
MinRK
API update involving map and load-balancing
r3635
def _validate_dependency(self, dep):
"""validate a dependency.
For use in `set_flags`.
"""
if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
return True
elif isinstance(dep, (list,set, tuple)):
for d in dep:
if not isinstance(d, str, AsyncResult):
return False
elif isinstance(dep, dict):
if set(dep.keys()) != set(Dependency().as_dict().keys()):
return False
if not isinstance(dep['msg_ids'], list):
return False
for d in dep['msg_ids']:
if not isinstance(d, str):
return False
else:
return False
def set_flags(self, **kwargs):
"""set my attribute flags by keyword.
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 A View is a wrapper for the Client's apply method, but with attributes
that specify keyword arguments, those attributes can be set by keyword
argument with this method.
MinRK
API update involving map and load-balancing
r3635
Parameters
----------
block : bool
whether to wait for results
bound : bool
whether to use the engine's namespace
follow : Dependency, list, msg_id, AsyncResult
the location dependencies of tasks
after : Dependency, list, msg_id, AsyncResult
the time dependencies of tasks
timeout : int,None
the timeout to be used for tasks
"""
super(LoadBalancedView, self).set_flags(**kwargs)
for name in ('follow', 'after'):
if name in kwargs:
value = kwargs[name]
if self._validate_dependency(value):
setattr(self, name, value)
else:
raise ValueError("Invalid dependency: %r"%value)
if 'timeout' in kwargs:
t = kwargs['timeout']
if not isinstance(t, (int, long, float, None)):
raise TypeError("Invalid type for timeout: %r"%type(t))
if t is not None:
if t < 0:
raise ValueError("Invalid timeout: %s"%t)
self.timeout = t
@spin_after
@save_ids
def map(self, f, *sequences, **kwargs):
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult
Parallel version of builtin `map`, load-balanced by this View.
MinRK
API update involving map and load-balancing
r3635
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 `block`, `bound`, and `chunk_size` can be specified by keyword only.
Each `chunk_size` elements will be a separate task, and will be
load-balanced. This lets individual elements be available for iteration
as soon as they arrive.
MinRK
API update involving map and load-balancing
r3635
Parameters
----------
f : callable
function to be mapped
*sequences: one or more sequences of matching length
the sequences to be distributed and passed to `f`
block : bool
whether to wait for the result or not [default self.block]
bound : bool
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 whether to use the engine's namespace [default self.bound]
chunk_size : int
how many elements should be in each task [default 1]
MinRK
API update involving map and load-balancing
r3635
Returns
-------
if block=False:
AsyncMapResult
An object like AsyncResult, but which reassembles the sequence of results
into a single list. AsyncMapResults can be iterated through before all
results are complete.
else:
the result of map(f,*sequences)
"""
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 # default
MinRK
API update involving map and load-balancing
r3635 block = kwargs.get('block', self.block)
bound = kwargs.get('bound', self.bound)
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 chunk_size = kwargs.get('chunk_size', 1)
keyset = set(kwargs.keys())
extra_keys = keyset.difference_update(set(['block', 'bound', 'chunk_size']))
if extra_keys:
raise TypeError("Invalid kwargs: %s"%list(extra_keys))
MinRK
API update involving map and load-balancing
r3635
assert len(sequences) > 0, "must have some sequences to map onto!"
pf = ParallelFunction(self.client, f, block=block, bound=bound,
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 targets=self._targets, balanced=True,
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 chunk_size=chunk_size)
MinRK
API update involving map and load-balancing
r3635 return pf.map(*sequences)
MinRK
cleanup pass
r3644 __all__ = ['LoadBalancedView', 'DirectView']