##// END OF EJS Templates
remove all trailling spaces
remove all trailling spaces

File last commit:

r4872:34c10438
r4872:34c10438
Show More
view.py
1048 lines | 35.3 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """Views of remote engines.
Authors:
* Min RK
"""
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2010-2011 The IPython Development Team
MinRK
some docstring cleanup
r3584 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 import imp
import sys
MinRK
update API after sagedays29...
r3664 import warnings
from contextlib import contextmanager
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 from types import ModuleType
MinRK
update API after sagedays29...
r3664
import zmq
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 from IPython.testing.skipdoctest import skip_doctest
MinRK
add retries flag to LoadBalancedView...
r3873 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat, CInt
MinRK
prep newparallel for rebase...
r3539 from IPython.external.decorator import decorator
MinRK
eliminate relative imports
r3642
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel import util
from IPython.parallel.controller.dependency import Dependency, dependent
MinRK
update API after sagedays29...
r3664 from . import map as Map
from .asyncresult import AsyncResult, AsyncMapResult
MinRK
eliminate relative imports
r3642 from .remotefunction import ParallelFunction, parallel, remote
MinRK
prep newparallel for rebase...
r3539
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
# Decorators
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
@decorator
MinRK
view decorators for syncing history/results
r3543 def save_ids(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """Keep our history and outstanding attributes up to date after a method call."""
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 n_previous = len(self.client.history)
MinRK
update API after sagedays29...
r3664 try:
ret = f(self, *args, **kwargs)
finally:
nmsgs = len(self.client.history) - n_previous
msg_ids = self.client.history[-nmsgs:]
self.history.extend(msg_ids)
map(self.outstanding.add, msg_ids)
MinRK
view decorators for syncing history/results
r3543 return ret
@decorator
def sync_results(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """sync relevant results from self.client to our results attribute."""
MinRK
view decorators for syncing history/results
r3543 ret = f(self, *args, **kwargs)
delta = self.outstanding.difference(self.client.outstanding)
completed = self.outstanding.intersection(delta)
self.outstanding = self.outstanding.difference(completed)
for msg_id in completed:
self.results[msg_id] = self.client.results[msg_id]
return ret
@decorator
def spin_after(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """call spin after the method."""
MinRK
view decorators for syncing history/results
r3543 ret = f(self, *args, **kwargs)
self.spin()
return ret
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
MinRK
view decorators for syncing history/results
r3543
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 @skip_doctest
MinRK
API update involving map and load-balancing
r3635 class View(HasTraits):
MinRK
some docstring cleanup
r3584 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
some docstring cleanup
r3584 Don't use this class, use subclasses.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Methods
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 spin
flushes incoming results and registration state changes
control methods spin, and requesting `ids` also ensures up to date
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 wait
wait on one or more msg_ids
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 execution methods
apply
legacy: execute, run
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 data movement
push, pull, scatter, gather
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 query methods
get_result, queue_status, purge_results, result_status
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 control methods
abort, shutdown
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
some docstring cleanup
r3584 """
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 # flags
MinRK
API update involving map and load-balancing
r3635 block=Bool(False)
MinRK
update API after sagedays29...
r3664 track=Bool(True)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = Any()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 history=List()
outstanding = Set()
results = Dict()
MinRK
organize IPython.parallel into subpackages
r3673 client = Instance('IPython.parallel.Client')
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 _socket = Instance('zmq.Socket')
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 _flag_names = List(['targets', 'block', 'track'])
MinRK
cleanup pass
r3644 _targets = Any()
MinRK
update API after sagedays29...
r3664 _idents = Any()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def __init__(self, client=None, socket=None, **flags):
MinRK
update API after sagedays29...
r3664 super(View, self).__init__(client=client, _socket=socket)
MinRK
prep newparallel for rebase...
r3539 self.block = client.block
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 self.set_flags(**flags)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539 def __repr__(self):
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 strtargets = str(self.targets)
MinRK
prep newparallel for rebase...
r3539 if len(strtargets) > 16:
strtargets = strtargets[:12]+'...]'
return "<%s %s>"%(self.__class__.__name__, strtargets)
MinRK
view decorators for syncing history/results
r3543
MinRK
API update involving map and load-balancing
r3635 def set_flags(self, **kwargs):
"""set my attribute flags by keyword.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Views determine behavior with a few attributes (`block`, `track`, etc.).
These attributes can be set all at once by name with this method.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 block : bool
whether to wait for results
MinRK
reflect revised apply_bound pattern
r3655 track : bool
Bernardo B. Marques
remove all trailling spaces
r4872 whether to create a MessageTracker to allow the user to
MinRK
reflect revised apply_bound pattern
r3655 safely edit after arrays and buffers during non-copying
sends.
MinRK
API update involving map and load-balancing
r3635 """
MinRK
update API after sagedays29...
r3664 for name, value in kwargs.iteritems():
if name not in self._flag_names:
raise KeyError("Invalid name: %r"%name)
else:
setattr(self, name, value)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @contextmanager
def temp_flags(self, **kwargs):
"""temporarily set flags, for use in `with` statements.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 See set_flags for permanent setting of flags
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Examples
--------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 >>> view.track=False
...
>>> with view.temp_flags(track=True):
... ar = view.apply(dostuff, my_big_array)
... ar.tracker.wait() # wait for send to finish
>>> view.track
False
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 """
# preflight: save flags, and set temporaries
saved_flags = {}
for f in self._flag_names:
saved_flags[f] = getattr(self, f)
self.set_flags(**kwargs)
# yield to the with-statement block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 try:
yield
finally:
# postflight: restore saved flags
self.set_flags(**saved_flags)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 #----------------------------------------------------------------
MinRK
update API after sagedays29...
r3664 # apply
MinRK
API update involving map and load-balancing
r3635 #----------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
view decorators for syncing history/results
r3543 @sync_results
@save_ids
MinRK
update API after sagedays29...
r3664 def _really_apply(self, f, args, kwargs, block=None, **options):
"""wrapper for client.send_apply_message"""
raise NotImplementedError("Implement in subclasses")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def apply(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines, returning the result.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 This method sets all apply flags via this View's attributes.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 if self.block is False:
MinRK
reflect revised apply_bound pattern
r3655 returns AsyncResult
MinRK
prep newparallel for rebase...
r3539 else:
returns actual result of f(*args, **kwargs)
"""
MinRK
update API after sagedays29...
r3664 return self._really_apply(f, args, kwargs)
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539 def apply_async(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
reflect revised apply_bound pattern
r3655 returns AsyncResult
MinRK
prep newparallel for rebase...
r3539 """
MinRK
update API after sagedays29...
r3664 return self._really_apply(f, args, kwargs, block=False)
MinRK
view decorators for syncing history/results
r3543
@spin_after
MinRK
prep newparallel for rebase...
r3539 def apply_sync(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a blocking manner,
returning the result.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 returns: actual result of f(*args, **kwargs)
"""
MinRK
update API after sagedays29...
r3664 return self._really_apply(f, args, kwargs, block=True)
MinRK
view decorators for syncing history/results
r3543
MinRK
update API after sagedays29...
r3664 #----------------------------------------------------------------
# wrappers for client and control methods
#----------------------------------------------------------------
MinRK
view decorators for syncing history/results
r3543 @sync_results
MinRK
update API after sagedays29...
r3664 def spin(self):
"""spin the client, and sync"""
self.client.spin()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @sync_results
def wait(self, jobs=None, timeout=-1):
"""waits on one or more `jobs`, for up to `timeout` seconds.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
ints are indices to self.history
strs are msg_ids
default: wait on all outstanding messages
timeout : float
a time in seconds, after which to give up.
default is -1, which means no timeout
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 True : when all msg_ids are done
False : timeout reached, some msg_ids still outstanding
MinRK
prep newparallel for rebase...
r3539 """
MinRK
update API after sagedays29...
r3664 if jobs is None:
jobs = self.history
return self.client.wait(jobs, timeout)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def abort(self, jobs=None, targets=None, block=None):
MinRK
basic LoadBalancedView, RemoteFunction
r3559 """Abort jobs on my engines.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
basic LoadBalancedView, RemoteFunction
r3559 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 jobs : None, str, list of strs, optional
MinRK
basic LoadBalancedView, RemoteFunction
r3559 if None: abort all jobs.
else: abort specific msg_id(s).
"""
block = block if block is not None else self.block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
return self.client.abort(jobs=jobs, targets=targets, block=block)
MinRK
basic LoadBalancedView, RemoteFunction
r3559
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def queue_status(self, targets=None, verbose=False):
MinRK
basic LoadBalancedView, RemoteFunction
r3559 """Fetch the Queue status of my engines"""
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
return self.client.queue_status(targets=targets, verbose=verbose)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def purge_results(self, jobs=[], targets=[]):
MinRK
basic LoadBalancedView, RemoteFunction
r3559 """Instruct the controller to forget specific results."""
if targets is None or targets == 'all':
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = self.targets
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return self.client.purge_results(jobs=jobs, targets=targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add shutdown to Views
r3667 def shutdown(self, targets=None, restart=False, hub=False, block=None):
"""Terminates one or more engine processes, optionally including the hub.
"""
block = self.block if block is None else block
if targets is None or targets == 'all':
targets = self.targets
return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 @spin_after
def get_result(self, indices_or_msg_ids=None):
"""return one or more results, specified by history index or msg_id.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 See client.get_result for details.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if indices_or_msg_ids is None:
indices_or_msg_ids = -1
if isinstance(indices_or_msg_ids, int):
indices_or_msg_ids = self.history[indices_or_msg_ids]
elif isinstance(indices_or_msg_ids, (list,tuple,set)):
indices_or_msg_ids = list(indices_or_msg_ids)
for i,index in enumerate(indices_or_msg_ids):
if isinstance(index, int):
indices_or_msg_ids[i] = self.history[index]
return self.client.get_result(indices_or_msg_ids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 #-------------------------------------------------------------------
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 # Map
#-------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 def map(self, f, *sequences, **kwargs):
"""override in subclasses"""
raise NotImplementedError
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 def map_async(self, f, *sequences, **kwargs):
"""Parallel version of builtin `map`, using this view's engines.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 This is equivalent to map(...block=False)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 See `self.map` for details.
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 """
if 'block' in kwargs:
raise TypeError("map_async doesn't take a `block` keyword argument.")
kwargs['block'] = False
return self.map(f,*sequences,**kwargs)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 def map_sync(self, f, *sequences, **kwargs):
"""Parallel version of builtin `map`, using this view's engines.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 This is equivalent to map(...block=True)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 See `self.map` for details.
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 """
if 'block' in kwargs:
raise TypeError("map_sync doesn't take a `block` keyword argument.")
kwargs['block'] = True
return self.map(f,*sequences,**kwargs)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def imap(self, f, *sequences, **kwargs):
"""Parallel version of `itertools.imap`.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 See `self.map` for details.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return iter(self.map_async(f,*sequences, **kwargs))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 #-------------------------------------------------------------------
MinRK
API update involving map and load-balancing
r3635 # Decorators
#-------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def remote(self, block=True, **flags):
MinRK
API update involving map and load-balancing
r3635 """Decorator for making a RemoteFunction"""
MinRK
update API after sagedays29...
r3664 block = self.block if block is None else block
return remote(self, block=block, **flags)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def parallel(self, dist='b', block=None, **flags):
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 """Decorator for making a ParallelFunction"""
block = self.block if block is None else block
MinRK
update API after sagedays29...
r3664 return parallel(self, dist=dist, block=block, **flags)
MinRK
view decorators for syncing history/results
r3543
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 @skip_doctest
MinRK
prep newparallel for rebase...
r3539 class DirectView(View):
MinRK
some docstring cleanup
r3584 """Direct Multiplexer View of one or more engines.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
some docstring cleanup
r3584 These are created via indexed access to a client:
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
some docstring cleanup
r3584 >>> dv_1 = client[1]
>>> dv_all = client[:]
>>> dv_even = client[::2]
>>> dv_some = client[1:3]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
dependency tweaks + dependency/scheduler docs
r3624 This object provides dictionary access to engine namespaces:
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
dependency tweaks + dependency/scheduler docs
r3624 # push a=5:
Bernardo B. Marques
remove all trailling spaces
r4872 >>> dv['a'] = 5
MinRK
dependency tweaks + dependency/scheduler docs
r3624 # pull 'foo':
>>> db['foo']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
some docstring cleanup
r3584 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def __init__(self, client=None, socket=None, targets=None):
super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 @property
def importer(self):
"""sync_imports(local=True) as a property.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 See sync_imports for details.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 """
return self.sync_imports(True)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 @contextmanager
def sync_imports(self, local=True):
"""Context Manager for performing simultaneous local and remote imports.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 'import x as y' will *not* work. The 'as y' part will simply be ignored.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 >>> with view.sync_imports():
... from numpy import recarray
importing recarray from numpy on engine(s)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 """
import __builtin__
local_import = __builtin__.__import__
modules = set()
results = []
@util.interactive
def remote_import(name, fromlist, level):
"""the function to be passed to apply, that actually performs the import
on the engine, and loads up the user namespace.
"""
import sys
user_ns = globals()
mod = __import__(name, fromlist=fromlist, level=level)
if fromlist:
for key in fromlist:
user_ns[key] = getattr(mod, key)
else:
user_ns[name] = sys.modules[name]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
"""the drop-in replacement for __import__, that optionally imports
locally as well.
"""
# don't override nested imports
save_import = __builtin__.__import__
__builtin__.__import__ = local_import
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 if imp.lock_held():
# this is a side-effect import, don't do it remotely, or even
# ignore the local effects
return local_import(name, globals, locals, fromlist, level)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 imp.acquire_lock()
if local:
mod = local_import(name, globals, locals, fromlist, level)
else:
raise NotImplementedError("remote-only imports not yet implemented")
imp.release_lock()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 key = name+':'+','.join(fromlist or [])
if level == -1 and key not in modules:
modules.add(key)
if fromlist:
print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
else:
print "importing %s on engine(s)"%name
results.append(self.apply_async(remote_import, name, fromlist, level))
# restore override
__builtin__.__import__ = save_import
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return mod
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 # override __import__
__builtin__.__import__ = view_import
try:
# enter the block
yield
except ImportError:
if not local:
# ignore import errors if not doing local imports
pass
finally:
# always restore __import__
__builtin__.__import__ = local_import
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 for r in results:
# raise possible remote ImportErrors here
r.get()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @sync_results
MinRK
API update involving map and load-balancing
r3635 @save_ids
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
MinRK
update API after sagedays29...
r3664 """calls f(*args, **kwargs) on remote engines, returning the result.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 This method sets all of `apply`'s flags via this View's attributes.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 f : callable
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 args : list [default: empty]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 kwargs : dict [default: empty]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets : target list [default: self.targets]
where to run
MinRK
update API after sagedays29...
r3664 block : bool [default: self.block]
Bernardo B. Marques
remove all trailling spaces
r4872 whether to block
MinRK
update API after sagedays29...
r3664 track : bool [default: self.track]
whether to ask zmq to track the message, for safe non-copying sends
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 if self.block is False:
returns AsyncResult
else:
returns actual result of f(*args, **kwargs) on the engine(s)
This will be a list of self.targets is also a list (even length 1), or
the single result if self.targets is an integer engine id
"""
args = [] if args is None else args
kwargs = {} if kwargs is None else kwargs
block = self.block if block is None else block
track = self.track if track is None else track
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = self.targets if targets is None else targets
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 _idents = self.client._build_targets(targets)[0]
MinRK
update API after sagedays29...
r3664 msg_ids = []
trackers = []
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 for ident in _idents:
MinRK
update API after sagedays29...
r3664 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
ident=ident)
if track:
trackers.append(msg['tracker'])
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_ids.append(msg['header']['msg_id'])
MinRK
update API after sagedays29...
r3664 tracker = None if track is False else zmq.MessageTracker(*trackers)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
MinRK
update API after sagedays29...
r3664 if block:
try:
return ar.get()
except KeyboardInterrupt:
pass
return ar
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_after
MinRK
API update involving map and load-balancing
r3635 def map(self, f, *sequences, **kwargs):
MinRK
update API after sagedays29...
r3664 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Parallel version of builtin `map`, using this View's `targets`.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 There will be one task per target, so work will be chunked
Bernardo B. Marques
remove all trailling spaces
r4872 if the sequences are longer than `targets`.
MinRK
API update involving map and load-balancing
r3635 Results can be iterated as they are ready, but will become available in chunks.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 f : callable
function to be mapped
*sequences: one or more sequences of matching length
the sequences to be distributed and passed to `f`
block : bool
whether to wait for the result or not [default self.block]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 if block=False:
AsyncMapResult
An object like AsyncResult, but which reassembles the sequence of results
into a single list. AsyncMapResults can be iterated through before all
results are complete.
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 else:
list
MinRK
API update involving map and load-balancing
r3635 the result of map(f,*sequences)
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 block = kwargs.pop('block', self.block)
MinRK
API update involving map and load-balancing
r3635 for k in kwargs.keys():
MinRK
update API after sagedays29...
r3664 if k not in ['block', 'track']:
MinRK
API update involving map and load-balancing
r3635 raise TypeError("invalid keyword arg, %r"%k)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 assert len(sequences) > 0, "must have some sequences to map onto!"
MinRK
update API after sagedays29...
r3664 pf = ParallelFunction(self, f, block=block, **kwargs)
MinRK
API update involving map and load-balancing
r3635 return pf.map(*sequences)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def execute(self, code, targets=None, block=None):
MinRK
update API after sagedays29...
r3664 """Executes `code` on `targets` in blocking or nonblocking manner.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 ``execute`` is always `bound` (affects engine namespace)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 code : str
the code string to be executed
block : bool
whether or not to wait until done to return
default: self.block
"""
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def run(self, filename, targets=None, block=None):
Bernardo B. Marques
remove all trailling spaces
r4872 """Execute contents of `filename` on my engine(s).
MinRK
update API after sagedays29...
r3664 This simply reads the contents of the file and calls `execute`.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 filename : str
The path to the file
targets : int/str/list of ints/strs
the engines on which to execute
default : all
block : bool
whether or not to wait until done
default: self.block
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 """
with open(filename, 'r') as f:
# add newline in case of trailing indented whitespace
# which will cause SyntaxError
code = f.read()+'\n'
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return self.execute(code, block=block, targets=targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def update(self, ns):
MinRK
update API after sagedays29...
r3664 """update remote namespace with dict `ns`
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 See `push` for details.
"""
return self.push(ns, block=self.block, track=self.track)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def push(self, ns, targets=None, block=None, track=None):
MinRK
update API after sagedays29...
r3664 """update remote namespace with dict `ns`
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 ns : dict
dict of keys with which to update engine namespace(s)
block : bool [default : self.block]
whether to wait to be notified of engine receipt
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 block = block if block is not None else self.block
track = track if track is not None else self.track
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
MinRK
update API after sagedays29...
r3664 # applier = self.apply_sync if block else self.apply_async
if not isinstance(ns, dict):
raise TypeError("Must be a dict, not %s"%type(ns))
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return self._really_apply(util._push, (ns,), block=block, track=track, targets=targets)
MinRK
testing fixes
r3641
MinRK
prep newparallel for rebase...
r3539 def get(self, key_s):
"""get object(s) by `key_s` from remote namespace
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 see `pull` for details.
"""
MinRK
prep newparallel for rebase...
r3539 # block = block if block is not None else self.block
MinRK
update API after sagedays29...
r3664 return self.pull(key_s, block=True)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update parallel_magic for Views...
r3689 def pull(self, names, targets=None, block=None):
MinRK
update API after sagedays29...
r3664 """get object(s) by `name` from remote namespace
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
basic LoadBalancedView, RemoteFunction
r3559 will return one object if it is a key.
MinRK
update API after sagedays29...
r3664 can also take a list of keys, in which case it will return a list of objects.
"""
MinRK
basic LoadBalancedView, RemoteFunction
r3559 block = block if block is not None else self.block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
MinRK
update API after sagedays29...
r3664 applier = self.apply_sync if block else self.apply_async
if isinstance(names, basestring):
pass
elif isinstance(names, (list,tuple,set)):
for key in names:
if not isinstance(key, basestring):
raise TypeError("keys must be str, not type %r"%type(key))
else:
raise TypeError("names must be strs, not %r"%names)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return self._really_apply(util._pull, (names,), block=block, targets=targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """
Partition a Python sequence and send the partitions to a set of engines.
"""
block = block if block is not None else self.block
MinRK
update API after sagedays29...
r3664 track = track if track is not None else self.track
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 mapObject = Map.dists[dist]()
nparts = len(targets)
msg_ids = []
trackers = []
for index, engineid in enumerate(targets):
partition = mapObject.getPartition(seq, index, nparts)
if flatten and len(partition) == 1:
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 ns = {key: partition[0]}
MinRK
update API after sagedays29...
r3664 else:
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 ns = {key: partition}
r = self.push(ns, block=False, track=track, targets=engineid)
MinRK
update API after sagedays29...
r3664 msg_ids.extend(r.msg_ids)
if track:
trackers.append(r._tracker)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 if track:
tracker = zmq.MessageTracker(*trackers)
else:
tracker = None
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
if block:
r.wait()
else:
return r
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
tweaks related to docs + add activate() for magics
r3590 @sync_results
@save_ids
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def gather(self, key, dist='b', targets=None, block=None):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """
Gather a partitioned sequence on a set of engines as a single local seq.
"""
block = block if block is not None else self.block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
MinRK
update API after sagedays29...
r3664 mapObject = Map.dists[dist]()
msg_ids = []
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 for index, engineid in enumerate(targets):
msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 if block:
try:
return r.get()
except KeyboardInterrupt:
pass
return r
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def __getitem__(self, key):
return self.get(key)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
basic LoadBalancedView, RemoteFunction
r3559 def __setitem__(self,key, value):
MinRK
prep newparallel for rebase...
r3539 self.update({key:value})
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def clear(self, targets=None, block=False):
MinRK
control channel progress
r3540 """Clear the remote namespaces on my engines."""
block = block if block is not None else self.block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
return self.client.clear(targets=targets, block=block)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def kill(self, targets=None, block=True):
MinRK
control channel progress
r3540 """Kill my engines."""
block = block if block is not None else self.block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
return self.client.kill(targets=targets, block=block)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
tweaks related to docs + add activate() for magics
r3590 #----------------------------------------
# activate for %px,%autopx magics
#----------------------------------------
def activate(self):
"""Make this `View` active for parallel magic commands.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
tweaks related to docs + add activate() for magics
r3590 IPython has a magic command syntax to work with `MultiEngineClient` objects.
In a given IPython session there is a single active one. While
Bernardo B. Marques
remove all trailling spaces
r4872 there can be many `Views` created and used by the user,
there is only one active one. The active `View` is used whenever
MinRK
tweaks related to docs + add activate() for magics
r3590 the magic commands %px and %autopx are used.
Bernardo B. Marques
remove all trailling spaces
r4872
The activate() method is called on a given `View` to make it
MinRK
tweaks related to docs + add activate() for magics
r3590 active. Once this has been done, the magic commands can be used.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
tweaks related to docs + add activate() for magics
r3590 try:
# This is injected into __builtins__.
ip = get_ipython()
except NameError:
print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
else:
pmagic = ip.plugin_manager.get_plugin('parallelmagic')
MinRK
fix %autopx in scripts by calling run_code for each ast node...
r3735 if pmagic is None:
ip.magic_load_ext('parallelmagic')
pmagic = ip.plugin_manager.get_plugin('parallelmagic')
pmagic.active_view = self
MinRK
tweaks related to docs + add activate() for magics
r3590
MinRK
testing fixes
r3641
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 @skip_doctest
MinRK
prep newparallel for rebase...
r3539 class LoadBalancedView(View):
MinRK
API update involving map and load-balancing
r3635 """An load-balancing View that only executes via the Task scheduler.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 Load-balanced views can be created with the client's `view` method:
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 >>> v = client.load_balanced_view()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 or targets can be specified, to restrict the potential destinations:
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 >>> v = client.client.load_balanced_view(([1,3])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow load-balancing across subsets of engines
r3625 which would restrict loadbalancing to between engines 1 and 3.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
some docstring cleanup
r3584 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 follow=Any()
after=Any()
timeout=CFloat()
MinRK
add retries flag to LoadBalancedView...
r3873 retries = CInt(0)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 _task_scheme = Any()
MinRK
add retries flag to LoadBalancedView...
r3873 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def __init__(self, client=None, socket=None, **flags):
super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
MinRK
update API after sagedays29...
r3664 self._task_scheme=client._task_scheme
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 def _validate_dependency(self, dep):
"""validate a dependency.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 For use in `set_flags`.
"""
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
MinRK
API update involving map and load-balancing
r3635 return True
elif isinstance(dep, (list,set, tuple)):
for d in dep:
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 if not isinstance(d, (basestring, AsyncResult)):
MinRK
API update involving map and load-balancing
r3635 return False
elif isinstance(dep, dict):
if set(dep.keys()) != set(Dependency().as_dict().keys()):
return False
if not isinstance(dep['msg_ids'], list):
return False
for d in dep['msg_ids']:
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 if not isinstance(d, basestring):
MinRK
API update involving map and load-balancing
r3635 return False
else:
return False
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 return True
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def _render_dependency(self, dep):
"""helper for building jsonable dependencies from various input forms."""
if isinstance(dep, Dependency):
return dep.as_dict()
elif isinstance(dep, AsyncResult):
return dep.msg_ids
elif dep is None:
return []
else:
# pass to Dependency constructor
return list(Dependency(dep))
MinRK
API update involving map and load-balancing
r3635 def set_flags(self, **kwargs):
"""set my attribute flags by keyword.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 A View is a wrapper for the Client's apply method, but with attributes
that specify keyword arguments, those attributes can be set by keyword
argument with this method.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 block : bool
whether to wait for results
MinRK
reflect revised apply_bound pattern
r3655 track : bool
Bernardo B. Marques
remove all trailling spaces
r4872 whether to create a MessageTracker to allow the user to
MinRK
reflect revised apply_bound pattern
r3655 safely edit after arrays and buffers during non-copying
sends.
MinRK
add retries flag to LoadBalancedView...
r3873
MinRK
update API after sagedays29...
r3664 after : Dependency or collection of msg_ids
Only for load-balanced execution (targets=None)
Specify a list of msg_ids as a time-based dependency.
This job will only be run *after* the dependencies
have been met.
follow : Dependency or collection of msg_ids
Only for load-balanced execution (targets=None)
Specify a list of msg_ids as a location-based dependency.
This job will only be run on an engine where this dependency
is met.
timeout : float/int or None
Only for load-balanced execution (targets=None)
Specify an amount of time (in seconds) for the scheduler to
wait for dependencies to be met before failing with a
DependencyTimeout.
MinRK
add retries flag to LoadBalancedView...
r3873
retries : int
Number of times a task will be retried on failure.
MinRK
API update involving map and load-balancing
r3635 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 super(LoadBalancedView, self).set_flags(**kwargs)
for name in ('follow', 'after'):
if name in kwargs:
value = kwargs[name]
if self._validate_dependency(value):
setattr(self, name, value)
else:
raise ValueError("Invalid dependency: %r"%value)
if 'timeout' in kwargs:
t = kwargs['timeout']
MinRK
update API after sagedays29...
r3664 if not isinstance(t, (int, long, float, type(None))):
MinRK
API update involving map and load-balancing
r3635 raise TypeError("Invalid type for timeout: %r"%type(t))
if t is not None:
if t < 0:
raise ValueError("Invalid timeout: %s"%t)
self.timeout = t
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @sync_results
@save_ids
def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 after=None, follow=None, timeout=None,
MinRK
add retries flag to LoadBalancedView...
r3873 targets=None, retries=None):
MinRK
update API after sagedays29...
r3664 """calls f(*args, **kwargs) on a remote engine, returning the result.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 This method temporarily sets all of `apply`'s flags for a single call.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 f : callable
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 args : list [default: empty]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 kwargs : dict [default: empty]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 block : bool [default: self.block]
Bernardo B. Marques
remove all trailling spaces
r4872 whether to block
MinRK
update API after sagedays29...
r3664 track : bool [default: self.track]
whether to ask zmq to track the message, for safe non-copying sends
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 !!!!!! TODO: THE REST HERE !!!!
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 if self.block is False:
returns AsyncResult
else:
returns actual result of f(*args, **kwargs) on the engine(s)
This will be a list of self.targets is also a list (even length 1), or
the single result if self.targets is an integer engine id
"""
# validate whether we can run
if self._socket.closed:
msg = "Task farming is disabled"
if self._task_scheme == 'pure':
msg += " because the pure ZMQ scheduler cannot handle"
msg += " disappearing engines."
raise RuntimeError(msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 if self._task_scheme == 'pure':
MinRK
add retries flag to LoadBalancedView...
r3873 # pure zmq scheme doesn't support extra features
msg = "Pure ZMQ scheduler doesn't support the following flags:"
"follow, after, retries, targets, timeout"
if (follow or after or retries or targets or timeout):
# hard fail on Scheduler flags
MinRK
update API after sagedays29...
r3664 raise RuntimeError(msg)
if isinstance(f, dependent):
# soft warn on functional dependencies
warnings.warn(msg, RuntimeWarning)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 # build args
args = [] if args is None else args
kwargs = {} if kwargs is None else kwargs
block = self.block if block is None else block
track = self.track if track is None else track
after = self.after if after is None else after
MinRK
add retries flag to LoadBalancedView...
r3873 retries = self.retries if retries is None else retries
MinRK
update API after sagedays29...
r3664 follow = self.follow if follow is None else follow
timeout = self.timeout if timeout is None else timeout
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = self.targets if targets is None else targets
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add retries flag to LoadBalancedView...
r3873 if not isinstance(retries, int):
raise TypeError('retries must be int, not %r'%type(retries))
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 if targets is None:
idents = []
else:
idents = self.client._build_targets(targets)[0]
MinRK
update parallel code for py3k...
r4155 # ensure *not* bytes
idents = [ ident.decode() for ident in idents ]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 after = self._render_dependency(after)
follow = self._render_dependency(follow)
MinRK
add retries flag to LoadBalancedView...
r3873 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
subheader=subheader)
tracker = None if track is False else msg['tracker']
Bernardo B. Marques
remove all trailling spaces
r4872
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 if block:
try:
return ar.get()
except KeyboardInterrupt:
pass
return ar
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 @spin_after
@save_ids
def map(self, f, *sequences, **kwargs):
MinRK
update API after sagedays29...
r3664 """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Parallel version of builtin `map`, load-balanced by this View.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 `block`, and `chunksize` can be specified by keyword only.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Each `chunksize` elements will be a separate task, and will be
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 load-balanced. This lets individual elements be available for iteration
as soon as they arrive.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 f : callable
function to be mapped
*sequences: one or more sequences of matching length
the sequences to be distributed and passed to `f`
block : bool
whether to wait for the result or not [default self.block]
MinRK
reflect revised apply_bound pattern
r3655 track : bool
Bernardo B. Marques
remove all trailling spaces
r4872 whether to create a MessageTracker to allow the user to
MinRK
reflect revised apply_bound pattern
r3655 safely edit after arrays and buffers during non-copying
sends.
MinRK
update API after sagedays29...
r3664 chunksize : int
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 how many elements should be in each task [default 1]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 if block=False:
AsyncMapResult
An object like AsyncResult, but which reassembles the sequence of results
into a single list. AsyncMapResults can be iterated through before all
results are complete.
else:
the result of map(f,*sequences)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 # default
MinRK
API update involving map and load-balancing
r3635 block = kwargs.get('block', self.block)
MinRK
update API after sagedays29...
r3664 chunksize = kwargs.get('chunksize', 1)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 keyset = set(kwargs.keys())
MinRK
update API after sagedays29...
r3664 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 if extra_keys:
raise TypeError("Invalid kwargs: %s"%list(extra_keys))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 assert len(sequences) > 0, "must have some sequences to map onto!"
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 pf = ParallelFunction(self, f, block=block, chunksize=chunksize)
MinRK
API update involving map and load-balancing
r3635 return pf.map(*sequences)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 __all__ = ['LoadBalancedView', 'DirectView']