##// END OF EJS Templates
tweak dagdeps for new AsyncResult objects
tweak dagdeps for new AsyncResult objects

File last commit:

r3598:6f48516f
r3606:9f1a03ab
Show More
view.py
354 lines | 12.0 KiB | text/x-python | PythonLexer
MinRK
some docstring cleanup
r3584 """Views of remote engines"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
from IPython.external.decorator import decorator
MinRK
tweaks related to docs + add activate() for magics
r3590 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
MinRK
prep newparallel for rebase...
r3539
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
# Decorators
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
@decorator
def myblock(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """override client.block with self.block during a call"""
MinRK
prep newparallel for rebase...
r3539 block = self.client.block
self.client.block = self.block
MinRK
tweaks related to docs + add activate() for magics
r3590 try:
ret = f(self, *args, **kwargs)
finally:
self.client.block = block
MinRK
prep newparallel for rebase...
r3539 return ret
MinRK
view decorators for syncing history/results
r3543 @decorator
def save_ids(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """Keep our history and outstanding attributes up to date after a method call."""
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 n_previous = len(self.client.history)
MinRK
view decorators for syncing history/results
r3543 ret = f(self, *args, **kwargs)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 nmsgs = len(self.client.history) - n_previous
msg_ids = self.client.history[-nmsgs:]
MinRK
view decorators for syncing history/results
r3543 self.history.extend(msg_ids)
map(self.outstanding.add, msg_ids)
return ret
@decorator
def sync_results(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """sync relevant results from self.client to our results attribute."""
MinRK
view decorators for syncing history/results
r3543 ret = f(self, *args, **kwargs)
delta = self.outstanding.difference(self.client.outstanding)
completed = self.outstanding.intersection(delta)
self.outstanding = self.outstanding.difference(completed)
for msg_id in completed:
self.results[msg_id] = self.client.results[msg_id]
return ret
@decorator
def spin_after(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """call spin after the method."""
MinRK
view decorators for syncing history/results
r3543 ret = f(self, *args, **kwargs)
self.spin()
return ret
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539 class View(object):
MinRK
some docstring cleanup
r3584 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
Don't use this class, use subclasses.
"""
MinRK
prep newparallel for rebase...
r3539 _targets = None
block=None
MinRK
basic LoadBalancedView, RemoteFunction
r3559 bound=None
MinRK
view decorators for syncing history/results
r3543 history=None
MinRK
prep newparallel for rebase...
r3539
MinRK
basic LoadBalancedView, RemoteFunction
r3559 def __init__(self, client, targets=None):
MinRK
prep newparallel for rebase...
r3539 self.client = client
self._targets = targets
MinRK
basic LoadBalancedView, RemoteFunction
r3559 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
MinRK
prep newparallel for rebase...
r3539 self.block = client.block
MinRK
tweaks related to docs + add activate() for magics
r3590 self.bound=False
MinRK
view decorators for syncing history/results
r3543 self.history = []
self.outstanding = set()
self.results = {}
MinRK
prep newparallel for rebase...
r3539 def __repr__(self):
strtargets = str(self._targets)
if len(strtargets) > 16:
strtargets = strtargets[:12]+'...]'
return "<%s %s>"%(self.__class__.__name__, strtargets)
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539 @property
def targets(self):
return self._targets
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539 @targets.setter
def targets(self, value):
MinRK
tweaks related to docs + add activate() for magics
r3590 self._targets = value
# raise AttributeError("Cannot set my targets argument after construction!")
MinRK
view decorators for syncing history/results
r3543
@sync_results
def spin(self):
"""spin the client, and sync"""
self.client.spin()
@sync_results
@save_ids
MinRK
prep newparallel for rebase...
r3539 def apply(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines, returning the result.
This method does not involve the engine's namespace.
if self.block is False:
returns msg_id
else:
returns actual result of f(*args, **kwargs)
"""
MinRK
basic LoadBalancedView, RemoteFunction
r3559 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
MinRK
view decorators for syncing history/results
r3543
@save_ids
MinRK
prep newparallel for rebase...
r3539 def apply_async(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner.
This method does not involve the engine's namespace.
returns msg_id
"""
return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
MinRK
view decorators for syncing history/results
r3543
@spin_after
@save_ids
MinRK
prep newparallel for rebase...
r3539 def apply_sync(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a blocking manner,
returning the result.
This method does not involve the engine's namespace.
returns: actual result of f(*args, **kwargs)
"""
return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
MinRK
view decorators for syncing history/results
r3543
@sync_results
@save_ids
MinRK
prep newparallel for rebase...
r3539 def apply_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s).
if self.block is False:
returns msg_id
else:
returns actual result of f(*args, **kwargs)
This method has access to the targets' globals
"""
return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
MinRK
view decorators for syncing history/results
r3543
@sync_results
@save_ids
MinRK
prep newparallel for rebase...
r3539 def apply_async_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s)
in a nonblocking manner.
returns: msg_id
This method has access to the targets' globals
"""
return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
MinRK
view decorators for syncing history/results
r3543
MinRK
whitespace
r3544 @spin_after
MinRK
view decorators for syncing history/results
r3543 @save_ids
MinRK
prep newparallel for rebase...
r3539 def apply_sync_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
returns: actual result of f(*args, **kwargs)
This method has access to the targets' globals
"""
MinRK
control channel progress
r3540 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
MinRK
basic LoadBalancedView, RemoteFunction
r3559
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 @spin_after
@save_ids
def map(self, f, *sequences):
"""Parallel version of builtin `map`, using this view's engines."""
if isinstance(self.targets, int):
targets = [self.targets]
MinRK
improved client.get_results() behavior
r3598 else:
targets = self.targets
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 pf = ParallelFunction(self.client, f, block=self.block,
bound=True, targets=targets)
return pf.map(*sequences)
MinRK
tweaks related to docs + add activate() for magics
r3590 def parallel(self, bound=True, block=True):
"""Decorator for making a ParallelFunction"""
return parallel(self.client, bound=bound, targets=self.targets, block=block)
MinRK
basic LoadBalancedView, RemoteFunction
r3559 def abort(self, msg_ids=None, block=None):
"""Abort jobs on my engines.
Parameters
----------
msg_ids : None, str, list of strs, optional
if None: abort all jobs.
else: abort specific msg_id(s).
"""
block = block if block is not None else self.block
return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
def queue_status(self, verbose=False):
"""Fetch the Queue status of my engines"""
return self.client.queue_status(targets=self.targets, verbose=verbose)
MinRK
tweaks related to docs + add activate() for magics
r3590 def purge_results(self, msg_ids=[], targets=[]):
MinRK
basic LoadBalancedView, RemoteFunction
r3559 """Instruct the controller to forget specific results."""
if targets is None or targets == 'all':
targets = self.targets
return self.client.purge_results(msg_ids=msg_ids, targets=targets)
MinRK
tweaks related to docs + add activate() for magics
r3590
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539
class DirectView(View):
MinRK
some docstring cleanup
r3584 """Direct Multiplexer View of one or more engines.
These are created via indexed access to a client:
>>> dv_1 = client[1]
>>> dv_all = client[:]
>>> dv_even = client[::2]
>>> dv_some = client[1:3]
MinRK
tweaks related to docs + add activate() for magics
r3590 This object provides dictionary access
MinRK
some docstring cleanup
r3584 """
MinRK
prep newparallel for rebase...
r3539
MinRK
tweaks related to docs + add activate() for magics
r3590 @sync_results
@save_ids
def execute(self, code, block=True):
"""execute some code on my targets."""
return self.client.execute(code, block=self.block, targets=self.targets)
MinRK
prep newparallel for rebase...
r3539 def update(self, ns):
"""update remote namespace with dict `ns`"""
return self.client.push(ns, targets=self.targets, block=self.block)
MinRK
basic LoadBalancedView, RemoteFunction
r3559 push = update
MinRK
prep newparallel for rebase...
r3539 def get(self, key_s):
"""get object(s) by `key_s` from remote namespace
will return one object if it is a key.
It also takes a list of keys, and will return a list of objects."""
# block = block if block is not None else self.block
MinRK
basic LoadBalancedView, RemoteFunction
r3559 return self.client.pull(key_s, block=True, targets=self.targets)
MinRK
prep newparallel for rebase...
r3539
MinRK
tweaks related to docs + add activate() for magics
r3590 @sync_results
@save_ids
MinRK
basic LoadBalancedView, RemoteFunction
r3559 def pull(self, key_s, block=True):
"""get object(s) by `key_s` from remote namespace
will return one object if it is a key.
It also takes a list of keys, and will return a list of objects."""
block = block if block is not None else self.block
return self.client.pull(key_s, block=block, targets=self.targets)
MinRK
prep newparallel for rebase...
r3539
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
"""
Partition a Python sequence and send the partitions to a set of engines.
"""
block = block if block is not None else self.block
MinRK
match return shape in AsyncResult to sync results
r3593 targets = targets if targets is not None else self.targets
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
return self.client.scatter(key, seq, dist=dist, flatten=flatten,
targets=targets, block=block)
MinRK
tweaks related to docs + add activate() for magics
r3590 @sync_results
@save_ids
MinRK
match return shape in AsyncResult to sync results
r3593 def gather(self, key, dist='b', targets=None, block=None):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """
Gather a partitioned sequence on a set of engines as a single local seq.
"""
block = block if block is not None else self.block
MinRK
match return shape in AsyncResult to sync results
r3593 targets = targets if targets is not None else self.targets
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
return self.client.gather(key, dist=dist, targets=targets, block=block)
MinRK
prep newparallel for rebase...
r3539 def __getitem__(self, key):
return self.get(key)
MinRK
basic LoadBalancedView, RemoteFunction
r3559 def __setitem__(self,key, value):
MinRK
prep newparallel for rebase...
r3539 self.update({key:value})
MinRK
control channel progress
r3540 def clear(self, block=False):
"""Clear the remote namespaces on my engines."""
block = block if block is not None else self.block
MinRK
basic LoadBalancedView, RemoteFunction
r3559 return self.client.clear(targets=self.targets, block=block)
MinRK
control channel progress
r3540
def kill(self, block=True):
"""Kill my engines."""
block = block if block is not None else self.block
MinRK
basic LoadBalancedView, RemoteFunction
r3559 return self.client.kill(targets=self.targets, block=block)
MinRK
prep newparallel for rebase...
r3539
MinRK
tweaks related to docs + add activate() for magics
r3590 #----------------------------------------
# activate for %px,%autopx magics
#----------------------------------------
def activate(self):
"""Make this `View` active for parallel magic commands.
IPython has a magic command syntax to work with `MultiEngineClient` objects.
In a given IPython session there is a single active one. While
there can be many `Views` created and used by the user,
there is only one active one. The active `View` is used whenever
the magic commands %px and %autopx are used.
The activate() method is called on a given `View` to make it
active. Once this has been done, the magic commands can be used.
"""
try:
# This is injected into __builtins__.
ip = get_ipython()
except NameError:
print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
else:
pmagic = ip.plugin_manager.get_plugin('parallelmagic')
if pmagic is not None:
pmagic.active_multiengine_client = self
else:
print "You must first load the parallelmagic extension " \
"by doing '%load_ext parallelmagic'"
MinRK
prep newparallel for rebase...
r3539 class LoadBalancedView(View):
MinRK
some docstring cleanup
r3584 """An engine-agnostic View that only executes via the Task queue.
Typically created via:
>>> lbv = client[None]
<LoadBalancedView tcp://127.0.0.1:12345>
but can also be created with:
>>> lbc = LoadBalancedView(client)
TODO: allow subset of engines across which to balance.
"""
MinRK
basic LoadBalancedView, RemoteFunction
r3559 def __repr__(self):
return "<%s %s>"%(self.__class__.__name__, self.client._addr)
MinRK
protect LBView.targets, AsyncResult._msg_ids -> .msg_ds
r3592 @property
def targets(self):
return None
@targets.setter
def targets(self, value):
raise AttributeError("Cannot set targets for LoadbalancedView!")
MinRK
prep newparallel for rebase...
r3539