##// END OF EJS Templates
move IPython.zmq.parallel to IPython.parallel
move IPython.zmq.parallel to IPython.parallel

File last commit:

r3666:a6a0636a
r3666:a6a0636a
Show More
view.py
1027 lines | 35.7 KiB | text/x-python | PythonLexer
MinRK
cleanup pass
r3644 """Views of remote engines."""
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 import imp
import sys
MinRK
update API after sagedays29...
r3664 import warnings
from contextlib import contextmanager
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 from types import ModuleType
MinRK
update API after sagedays29...
r3664
import zmq
MinRK
testing fixes
r3641 from IPython.testing import decorators as testdec
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat
MinRK
API update involving map and load-balancing
r3635
MinRK
prep newparallel for rebase...
r3539 from IPython.external.decorator import decorator
MinRK
eliminate relative imports
r3642
MinRK
update API after sagedays29...
r3664 from . import map as Map
from . import util
from .asyncresult import AsyncResult, AsyncMapResult
from .dependency import Dependency, dependent
MinRK
eliminate relative imports
r3642 from .remotefunction import ParallelFunction, parallel, remote
MinRK
prep newparallel for rebase...
r3539
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
# Decorators
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
@decorator
MinRK
view decorators for syncing history/results
r3543 def save_ids(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """Keep our history and outstanding attributes up to date after a method call."""
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 n_previous = len(self.client.history)
MinRK
update API after sagedays29...
r3664 try:
ret = f(self, *args, **kwargs)
finally:
nmsgs = len(self.client.history) - n_previous
msg_ids = self.client.history[-nmsgs:]
self.history.extend(msg_ids)
map(self.outstanding.add, msg_ids)
MinRK
view decorators for syncing history/results
r3543 return ret
@decorator
def sync_results(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """sync relevant results from self.client to our results attribute."""
MinRK
view decorators for syncing history/results
r3543 ret = f(self, *args, **kwargs)
delta = self.outstanding.difference(self.client.outstanding)
completed = self.outstanding.intersection(delta)
self.outstanding = self.outstanding.difference(completed)
for msg_id in completed:
self.results[msg_id] = self.client.results[msg_id]
return ret
@decorator
def spin_after(f, self, *args, **kwargs):
MinRK
some docstring cleanup
r3584 """call spin after the method."""
MinRK
view decorators for syncing history/results
r3543 ret = f(self, *args, **kwargs)
self.spin()
return ret
MinRK
some docstring cleanup
r3584 #-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
MinRK
view decorators for syncing history/results
r3543
MinRK
API update involving map and load-balancing
r3635 class View(HasTraits):
MinRK
some docstring cleanup
r3584 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
Don't use this class, use subclasses.
MinRK
update API after sagedays29...
r3664
Methods
-------
spin
flushes incoming results and registration state changes
control methods spin, and requesting `ids` also ensures up to date
wait
wait on one or more msg_ids
execution methods
apply
legacy: execute, run
data movement
push, pull, scatter, gather
query methods
get_result, queue_status, purge_results, result_status
control methods
abort, shutdown
MinRK
some docstring cleanup
r3584 """
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 # flags
MinRK
API update involving map and load-balancing
r3635 block=Bool(False)
MinRK
update API after sagedays29...
r3664 track=Bool(True)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = Any()
MinRK
API update involving map and load-balancing
r3635 history=List()
outstanding = Set()
results = Dict()
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 client = Instance('IPython.parallel.client.Client')
MinRK
API update involving map and load-balancing
r3635
MinRK
update API after sagedays29...
r3664 _socket = Instance('zmq.Socket')
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 _flag_names = List(['targets', 'block', 'track'])
MinRK
cleanup pass
r3644 _targets = Any()
MinRK
update API after sagedays29...
r3664 _idents = Any()
MinRK
prep newparallel for rebase...
r3539
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def __init__(self, client=None, socket=None, **flags):
MinRK
update API after sagedays29...
r3664 super(View, self).__init__(client=client, _socket=socket)
MinRK
prep newparallel for rebase...
r3539 self.block = client.block
MinRK
API update involving map and load-balancing
r3635
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 self.set_flags(**flags)
MinRK
allow load-balancing across subsets of engines
r3625
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539 def __repr__(self):
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 strtargets = str(self.targets)
MinRK
prep newparallel for rebase...
r3539 if len(strtargets) > 16:
strtargets = strtargets[:12]+'...]'
return "<%s %s>"%(self.__class__.__name__, strtargets)
MinRK
view decorators for syncing history/results
r3543
MinRK
API update involving map and load-balancing
r3635 def set_flags(self, **kwargs):
"""set my attribute flags by keyword.
MinRK
update API after sagedays29...
r3664 Views determine behavior with a few attributes (`block`, `track`, etc.).
These attributes can be set all at once by name with this method.
MinRK
API update involving map and load-balancing
r3635
Parameters
----------
block : bool
whether to wait for results
MinRK
reflect revised apply_bound pattern
r3655 track : bool
whether to create a MessageTracker to allow the user to
safely edit after arrays and buffers during non-copying
sends.
MinRK
API update involving map and load-balancing
r3635 """
MinRK
update API after sagedays29...
r3664 for name, value in kwargs.iteritems():
if name not in self._flag_names:
raise KeyError("Invalid name: %r"%name)
else:
setattr(self, name, value)
MinRK
API update involving map and load-balancing
r3635
MinRK
update API after sagedays29...
r3664 @contextmanager
def temp_flags(self, **kwargs):
"""temporarily set flags, for use in `with` statements.
See set_flags for permanent setting of flags
Examples
--------
>>> view.track=False
...
>>> with view.temp_flags(track=True):
... ar = view.apply(dostuff, my_big_array)
... ar.tracker.wait() # wait for send to finish
>>> view.track
False
"""
# preflight: save flags, and set temporaries
saved_flags = {}
for f in self._flag_names:
saved_flags[f] = getattr(self, f)
self.set_flags(**kwargs)
# yield to the with-statement block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 try:
yield
finally:
# postflight: restore saved flags
self.set_flags(**saved_flags)
MinRK
update API after sagedays29...
r3664
MinRK
API update involving map and load-balancing
r3635 #----------------------------------------------------------------
MinRK
update API after sagedays29...
r3664 # apply
MinRK
API update involving map and load-balancing
r3635 #----------------------------------------------------------------
MinRK
allow load-balancing across subsets of engines
r3625
MinRK
view decorators for syncing history/results
r3543 @sync_results
@save_ids
MinRK
update API after sagedays29...
r3664 def _really_apply(self, f, args, kwargs, block=None, **options):
"""wrapper for client.send_apply_message"""
raise NotImplementedError("Implement in subclasses")
MinRK
prep newparallel for rebase...
r3539 def apply(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines, returning the result.
MinRK
update API after sagedays29...
r3664 This method sets all apply flags via this View's attributes.
MinRK
prep newparallel for rebase...
r3539
if self.block is False:
MinRK
reflect revised apply_bound pattern
r3655 returns AsyncResult
MinRK
prep newparallel for rebase...
r3539 else:
returns actual result of f(*args, **kwargs)
"""
MinRK
update API after sagedays29...
r3664 return self._really_apply(f, args, kwargs)
MinRK
view decorators for syncing history/results
r3543
MinRK
prep newparallel for rebase...
r3539 def apply_async(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner.
MinRK
reflect revised apply_bound pattern
r3655 returns AsyncResult
MinRK
prep newparallel for rebase...
r3539 """
MinRK
update API after sagedays29...
r3664 return self._really_apply(f, args, kwargs, block=False)
MinRK
view decorators for syncing history/results
r3543
@spin_after
MinRK
prep newparallel for rebase...
r3539 def apply_sync(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a blocking manner,
returning the result.
returns: actual result of f(*args, **kwargs)
"""
MinRK
update API after sagedays29...
r3664 return self._really_apply(f, args, kwargs, block=True)
MinRK
view decorators for syncing history/results
r3543
MinRK
update API after sagedays29...
r3664 #----------------------------------------------------------------
# wrappers for client and control methods
#----------------------------------------------------------------
MinRK
view decorators for syncing history/results
r3543 @sync_results
MinRK
update API after sagedays29...
r3664 def spin(self):
"""spin the client, and sync"""
self.client.spin()
@sync_results
def wait(self, jobs=None, timeout=-1):
"""waits on one or more `jobs`, for up to `timeout` seconds.
MinRK
prep newparallel for rebase...
r3539
MinRK
update API after sagedays29...
r3664 Parameters
----------
MinRK
prep newparallel for rebase...
r3539
MinRK
update API after sagedays29...
r3664 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
ints are indices to self.history
strs are msg_ids
default: wait on all outstanding messages
timeout : float
a time in seconds, after which to give up.
default is -1, which means no timeout
MinRK
prep newparallel for rebase...
r3539
MinRK
update API after sagedays29...
r3664 Returns
-------
MinRK
prep newparallel for rebase...
r3539
MinRK
update API after sagedays29...
r3664 True : when all msg_ids are done
False : timeout reached, some msg_ids still outstanding
MinRK
prep newparallel for rebase...
r3539 """
MinRK
update API after sagedays29...
r3664 if jobs is None:
jobs = self.history
return self.client.wait(jobs, timeout)
MinRK
tweaks related to docs + add activate() for magics
r3590
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def abort(self, jobs=None, targets=None, block=None):
MinRK
basic LoadBalancedView, RemoteFunction
r3559 """Abort jobs on my engines.
Parameters
----------
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 jobs : None, str, list of strs, optional
MinRK
basic LoadBalancedView, RemoteFunction
r3559 if None: abort all jobs.
else: abort specific msg_id(s).
"""
block = block if block is not None else self.block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
return self.client.abort(jobs=jobs, targets=targets, block=block)
MinRK
basic LoadBalancedView, RemoteFunction
r3559
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def queue_status(self, targets=None, verbose=False):
MinRK
basic LoadBalancedView, RemoteFunction
r3559 """Fetch the Queue status of my engines"""
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
return self.client.queue_status(targets=targets, verbose=verbose)
MinRK
basic LoadBalancedView, RemoteFunction
r3559
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def purge_results(self, jobs=[], targets=[]):
MinRK
basic LoadBalancedView, RemoteFunction
r3559 """Instruct the controller to forget specific results."""
if targets is None or targets == 'all':
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = self.targets
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return self.client.purge_results(jobs=jobs, targets=targets)
@spin_after
def get_result(self, indices_or_msg_ids=None):
"""return one or more results, specified by history index or msg_id.
MinRK
API update involving map and load-balancing
r3635
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 See client.get_result for details.
"""
if indices_or_msg_ids is None:
indices_or_msg_ids = -1
if isinstance(indices_or_msg_ids, int):
indices_or_msg_ids = self.history[indices_or_msg_ids]
elif isinstance(indices_or_msg_ids, (list,tuple,set)):
indices_or_msg_ids = list(indices_or_msg_ids)
for i,index in enumerate(indices_or_msg_ids):
if isinstance(index, int):
indices_or_msg_ids[i] = self.history[index]
return self.client.get_result(indices_or_msg_ids)
MinRK
API update involving map and load-balancing
r3635 #-------------------------------------------------------------------
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 # Map
#-------------------------------------------------------------------
def map(self, f, *sequences, **kwargs):
"""override in subclasses"""
raise NotImplementedError
def map_async(self, f, *sequences, **kwargs):
"""Parallel version of builtin `map`, using this view's engines.
This is equivalent to map(...block=False)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 See `self.map` for details.
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 """
if 'block' in kwargs:
raise TypeError("map_async doesn't take a `block` keyword argument.")
kwargs['block'] = False
return self.map(f,*sequences,**kwargs)
def map_sync(self, f, *sequences, **kwargs):
"""Parallel version of builtin `map`, using this view's engines.
This is equivalent to map(...block=True)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 See `self.map` for details.
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 """
if 'block' in kwargs:
raise TypeError("map_sync doesn't take a `block` keyword argument.")
kwargs['block'] = True
return self.map(f,*sequences,**kwargs)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def imap(self, f, *sequences, **kwargs):
"""Parallel version of `itertools.imap`.
See `self.map` for details.
MinRK
update API after sagedays29...
r3664
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """
return iter(self.map_async(f,*sequences, **kwargs))
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 #-------------------------------------------------------------------
MinRK
API update involving map and load-balancing
r3635 # Decorators
#-------------------------------------------------------------------
MinRK
update API after sagedays29...
r3664 def remote(self, block=True, **flags):
MinRK
API update involving map and load-balancing
r3635 """Decorator for making a RemoteFunction"""
MinRK
update API after sagedays29...
r3664 block = self.block if block is None else block
return remote(self, block=block, **flags)
MinRK
tweaks related to docs + add activate() for magics
r3590
MinRK
update API after sagedays29...
r3664 def parallel(self, dist='b', block=None, **flags):
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 """Decorator for making a ParallelFunction"""
block = self.block if block is None else block
MinRK
update API after sagedays29...
r3664 return parallel(self, dist=dist, block=block, **flags)
MinRK
view decorators for syncing history/results
r3543
MinRK
testing fixes
r3641 @testdec.skip_doctest
MinRK
prep newparallel for rebase...
r3539 class DirectView(View):
MinRK
some docstring cleanup
r3584 """Direct Multiplexer View of one or more engines.
These are created via indexed access to a client:
>>> dv_1 = client[1]
>>> dv_all = client[:]
>>> dv_even = client[::2]
>>> dv_some = client[1:3]
MinRK
dependency tweaks + dependency/scheduler docs
r3624 This object provides dictionary access to engine namespaces:
# push a=5:
>>> dv['a'] = 5
# pull 'foo':
>>> db['foo']
MinRK
tweaks related to docs + add activate() for magics
r3590
MinRK
some docstring cleanup
r3584 """
MinRK
prep newparallel for rebase...
r3539
MinRK
update API after sagedays29...
r3664 def __init__(self, client=None, socket=None, targets=None):
super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665
@property
def importer(self):
"""sync_imports(local=True) as a property.
MinRK
update API after sagedays29...
r3664
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 See sync_imports for details.
In [10]: with v.importer:
....: import numpy
....:
importing numpy on engine(s)
"""
return self.sync_imports(True)
@contextmanager
def sync_imports(self, local=True):
"""Context Manager for performing simultaneous local and remote imports.
'import x as y' will *not* work. The 'as y' part will simply be ignored.
>>> with view.sync_imports():
... from numpy import recarray
importing recarray from numpy on engine(s)
"""
import __builtin__
local_import = __builtin__.__import__
modules = set()
results = []
@util.interactive
def remote_import(name, fromlist, level):
"""the function to be passed to apply, that actually performs the import
on the engine, and loads up the user namespace.
"""
import sys
user_ns = globals()
mod = __import__(name, fromlist=fromlist, level=level)
if fromlist:
for key in fromlist:
user_ns[key] = getattr(mod, key)
else:
user_ns[name] = sys.modules[name]
def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
"""the drop-in replacement for __import__, that optionally imports
locally as well.
"""
# don't override nested imports
save_import = __builtin__.__import__
__builtin__.__import__ = local_import
if imp.lock_held():
# this is a side-effect import, don't do it remotely, or even
# ignore the local effects
return local_import(name, globals, locals, fromlist, level)
imp.acquire_lock()
if local:
mod = local_import(name, globals, locals, fromlist, level)
else:
raise NotImplementedError("remote-only imports not yet implemented")
imp.release_lock()
key = name+':'+','.join(fromlist or [])
if level == -1 and key not in modules:
modules.add(key)
if fromlist:
print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
else:
print "importing %s on engine(s)"%name
results.append(self.apply_async(remote_import, name, fromlist, level))
# restore override
__builtin__.__import__ = save_import
return mod
# override __import__
__builtin__.__import__ = view_import
try:
# enter the block
yield
except ImportError:
if not local:
# ignore import errors if not doing local imports
pass
finally:
# always restore __import__
__builtin__.__import__ = local_import
for r in results:
# raise possible remote ImportErrors here
r.get()
MinRK
API update involving map and load-balancing
r3635
MinRK
update API after sagedays29...
r3664 @sync_results
MinRK
API update involving map and load-balancing
r3635 @save_ids
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
MinRK
update API after sagedays29...
r3664 """calls f(*args, **kwargs) on remote engines, returning the result.
This method sets all of `apply`'s flags via this View's attributes.
Parameters
----------
f : callable
args : list [default: empty]
kwargs : dict [default: empty]
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets : target list [default: self.targets]
where to run
MinRK
update API after sagedays29...
r3664 block : bool [default: self.block]
whether to block
track : bool [default: self.track]
whether to ask zmq to track the message, for safe non-copying sends
Returns
-------
if self.block is False:
returns AsyncResult
else:
returns actual result of f(*args, **kwargs) on the engine(s)
This will be a list of self.targets is also a list (even length 1), or
the single result if self.targets is an integer engine id
"""
args = [] if args is None else args
kwargs = {} if kwargs is None else kwargs
block = self.block if block is None else block
track = self.track if track is None else track
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = self.targets if targets is None else targets
_idents = self.client._build_targets(targets)[0]
MinRK
update API after sagedays29...
r3664 msg_ids = []
trackers = []
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 for ident in _idents:
MinRK
update API after sagedays29...
r3664 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
ident=ident)
if track:
trackers.append(msg['tracker'])
msg_ids.append(msg['msg_id'])
tracker = None if track is False else zmq.MessageTracker(*trackers)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
MinRK
update API after sagedays29...
r3664 if block:
try:
return ar.get()
except KeyboardInterrupt:
pass
return ar
@spin_after
MinRK
API update involving map and load-balancing
r3635 def map(self, f, *sequences, **kwargs):
MinRK
update API after sagedays29...
r3664 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639
Parallel version of builtin `map`, using this View's `targets`.
MinRK
API update involving map and load-balancing
r3635
There will be one task per target, so work will be chunked
if the sequences are longer than `targets`.
Results can be iterated as they are ready, but will become available in chunks.
Parameters
----------
f : callable
function to be mapped
*sequences: one or more sequences of matching length
the sequences to be distributed and passed to `f`
block : bool
whether to wait for the result or not [default self.block]
Returns
-------
if block=False:
AsyncMapResult
An object like AsyncResult, but which reassembles the sequence of results
into a single list. AsyncMapResults can be iterated through before all
results are complete.
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 else:
list
MinRK
API update involving map and load-balancing
r3635 the result of map(f,*sequences)
"""
MinRK
update API after sagedays29...
r3664 block = kwargs.pop('block', self.block)
MinRK
API update involving map and load-balancing
r3635 for k in kwargs.keys():
MinRK
update API after sagedays29...
r3664 if k not in ['block', 'track']:
MinRK
API update involving map and load-balancing
r3635 raise TypeError("invalid keyword arg, %r"%k)
assert len(sequences) > 0, "must have some sequences to map onto!"
MinRK
update API after sagedays29...
r3664 pf = ParallelFunction(self, f, block=block, **kwargs)
MinRK
API update involving map and load-balancing
r3635 return pf.map(*sequences)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def execute(self, code, targets=None, block=None):
MinRK
update API after sagedays29...
r3664 """Executes `code` on `targets` in blocking or nonblocking manner.
MinRK
eliminate relative imports
r3642
MinRK
update API after sagedays29...
r3664 ``execute`` is always `bound` (affects engine namespace)
MinRK
eliminate relative imports
r3642
MinRK
update API after sagedays29...
r3664 Parameters
----------
code : str
the code string to be executed
block : bool
whether or not to wait until done to return
default: self.block
"""
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
MinRK
tweaks related to docs + add activate() for magics
r3590
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def run(self, filename, targets=None, block=None):
MinRK
update API after sagedays29...
r3664 """Execute contents of `filename` on my engine(s).
MinRK
eliminate relative imports
r3642
MinRK
update API after sagedays29...
r3664 This simply reads the contents of the file and calls `execute`.
MinRK
eliminate relative imports
r3642
MinRK
update API after sagedays29...
r3664 Parameters
----------
filename : str
The path to the file
targets : int/str/list of ints/strs
the engines on which to execute
default : all
block : bool
whether or not to wait until done
default: self.block
"""
with open(filename, 'r') as f:
# add newline in case of trailing indented whitespace
# which will cause SyntaxError
code = f.read()+'\n'
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return self.execute(code, block=block, targets=targets)
MinRK
eliminate relative imports
r3642
MinRK
prep newparallel for rebase...
r3539 def update(self, ns):
MinRK
update API after sagedays29...
r3664 """update remote namespace with dict `ns`
See `push` for details.
"""
return self.push(ns, block=self.block, track=self.track)
MinRK
prep newparallel for rebase...
r3539
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def push(self, ns, targets=None, block=None, track=None):
MinRK
update API after sagedays29...
r3664 """update remote namespace with dict `ns`
MinRK
eliminate relative imports
r3642
MinRK
update API after sagedays29...
r3664 Parameters
----------
MinRK
eliminate relative imports
r3642
MinRK
update API after sagedays29...
r3664 ns : dict
dict of keys with which to update engine namespace(s)
block : bool [default : self.block]
whether to wait to be notified of engine receipt
"""
block = block if block is not None else self.block
track = track if track is not None else self.track
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
MinRK
update API after sagedays29...
r3664 # applier = self.apply_sync if block else self.apply_async
if not isinstance(ns, dict):
raise TypeError("Must be a dict, not %s"%type(ns))
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return self._really_apply(util._push, (ns,), block=block, track=track, targets=targets)
MinRK
testing fixes
r3641
MinRK
prep newparallel for rebase...
r3539 def get(self, key_s):
"""get object(s) by `key_s` from remote namespace
MinRK
update API after sagedays29...
r3664
see `pull` for details.
"""
MinRK
prep newparallel for rebase...
r3539 # block = block if block is not None else self.block
MinRK
update API after sagedays29...
r3664 return self.pull(key_s, block=True)
MinRK
prep newparallel for rebase...
r3539
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def pull(self, names, targets=None, block=True):
MinRK
update API after sagedays29...
r3664 """get object(s) by `name` from remote namespace
MinRK
basic LoadBalancedView, RemoteFunction
r3559 will return one object if it is a key.
MinRK
update API after sagedays29...
r3664 can also take a list of keys, in which case it will return a list of objects.
"""
MinRK
basic LoadBalancedView, RemoteFunction
r3559 block = block if block is not None else self.block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
MinRK
update API after sagedays29...
r3664 applier = self.apply_sync if block else self.apply_async
if isinstance(names, basestring):
pass
elif isinstance(names, (list,tuple,set)):
for key in names:
if not isinstance(key, basestring):
raise TypeError("keys must be str, not type %r"%type(key))
else:
raise TypeError("names must be strs, not %r"%names)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return self._really_apply(util._pull, (names,), block=block, targets=targets)
MinRK
prep newparallel for rebase...
r3539
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """
Partition a Python sequence and send the partitions to a set of engines.
"""
block = block if block is not None else self.block
MinRK
update API after sagedays29...
r3664 track = track if track is not None else self.track
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
MinRK
update API after sagedays29...
r3664 mapObject = Map.dists[dist]()
nparts = len(targets)
msg_ids = []
trackers = []
for index, engineid in enumerate(targets):
partition = mapObject.getPartition(seq, index, nparts)
if flatten and len(partition) == 1:
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 ns = {key: partition[0]}
MinRK
update API after sagedays29...
r3664 else:
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 ns = {key: partition}
r = self.push(ns, block=False, track=track, targets=engineid)
MinRK
update API after sagedays29...
r3664 msg_ids.extend(r.msg_ids)
if track:
trackers.append(r._tracker)
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
update API after sagedays29...
r3664 if track:
tracker = zmq.MessageTracker(*trackers)
else:
tracker = None
r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
if block:
r.wait()
else:
return r
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
tweaks related to docs + add activate() for magics
r3590 @sync_results
@save_ids
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def gather(self, key, dist='b', targets=None, block=None):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """
Gather a partitioned sequence on a set of engines as a single local seq.
"""
block = block if block is not None else self.block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
MinRK
update API after sagedays29...
r3664 mapObject = Map.dists[dist]()
msg_ids = []
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665
for index, engineid in enumerate(targets):
msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
update API after sagedays29...
r3664 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
if block:
try:
return r.get()
except KeyboardInterrupt:
pass
return r
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
prep newparallel for rebase...
r3539 def __getitem__(self, key):
return self.get(key)
MinRK
basic LoadBalancedView, RemoteFunction
r3559 def __setitem__(self,key, value):
MinRK
prep newparallel for rebase...
r3539 self.update({key:value})
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def clear(self, targets=None, block=False):
MinRK
control channel progress
r3540 """Clear the remote namespaces on my engines."""
block = block if block is not None else self.block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
return self.client.clear(targets=targets, block=block)
MinRK
control channel progress
r3540
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 def kill(self, targets=None, block=True):
MinRK
control channel progress
r3540 """Kill my engines."""
block = block if block is not None else self.block
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = targets if targets is not None else self.targets
return self.client.kill(targets=targets, block=block)
MinRK
prep newparallel for rebase...
r3539
MinRK
tweaks related to docs + add activate() for magics
r3590 #----------------------------------------
# activate for %px,%autopx magics
#----------------------------------------
def activate(self):
"""Make this `View` active for parallel magic commands.
IPython has a magic command syntax to work with `MultiEngineClient` objects.
In a given IPython session there is a single active one. While
there can be many `Views` created and used by the user,
there is only one active one. The active `View` is used whenever
the magic commands %px and %autopx are used.
The activate() method is called on a given `View` to make it
active. Once this has been done, the magic commands can be used.
"""
try:
# This is injected into __builtins__.
ip = get_ipython()
except NameError:
print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
else:
pmagic = ip.plugin_manager.get_plugin('parallelmagic')
if pmagic is not None:
pmagic.active_multiengine_client = self
else:
print "You must first load the parallelmagic extension " \
"by doing '%load_ext parallelmagic'"
MinRK
testing fixes
r3641
@testdec.skip_doctest
MinRK
prep newparallel for rebase...
r3539 class LoadBalancedView(View):
MinRK
API update involving map and load-balancing
r3635 """An load-balancing View that only executes via the Task scheduler.
MinRK
some docstring cleanup
r3584
MinRK
API update involving map and load-balancing
r3635 Load-balanced views can be created with the client's `view` method:
MinRK
some docstring cleanup
r3584
MinRK
update API after sagedays29...
r3664 >>> v = client.load_balanced_view()
MinRK
some docstring cleanup
r3584
MinRK
API update involving map and load-balancing
r3635 or targets can be specified, to restrict the potential destinations:
MinRK
some docstring cleanup
r3584
MinRK
update API after sagedays29...
r3664 >>> v = client.client.load_balanced_view(([1,3])
MinRK
allow load-balancing across subsets of engines
r3625
which would restrict loadbalancing to between engines 1 and 3.
MinRK
some docstring cleanup
r3584
"""
MinRK
basic LoadBalancedView, RemoteFunction
r3559
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 follow=Any()
after=Any()
timeout=CFloat()
MinRK
allow load-balancing across subsets of engines
r3625
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 _task_scheme = Any()
_flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout'])
def __init__(self, client=None, socket=None, **flags):
super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
MinRK
update API after sagedays29...
r3664 self._task_scheme=client._task_scheme
MinRK
API update involving map and load-balancing
r3635
def _validate_dependency(self, dep):
"""validate a dependency.
For use in `set_flags`.
"""
if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
return True
elif isinstance(dep, (list,set, tuple)):
for d in dep:
MinRK
update API after sagedays29...
r3664 if not isinstance(d, (str, AsyncResult)):
MinRK
API update involving map and load-balancing
r3635 return False
elif isinstance(dep, dict):
if set(dep.keys()) != set(Dependency().as_dict().keys()):
return False
if not isinstance(dep['msg_ids'], list):
return False
for d in dep['msg_ids']:
if not isinstance(d, str):
return False
else:
return False
MinRK
update API after sagedays29...
r3664
return True
MinRK
API update involving map and load-balancing
r3635
MinRK
update API after sagedays29...
r3664 def _render_dependency(self, dep):
"""helper for building jsonable dependencies from various input forms."""
if isinstance(dep, Dependency):
return dep.as_dict()
elif isinstance(dep, AsyncResult):
return dep.msg_ids
elif dep is None:
return []
else:
# pass to Dependency constructor
return list(Dependency(dep))
MinRK
API update involving map and load-balancing
r3635 def set_flags(self, **kwargs):
"""set my attribute flags by keyword.
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 A View is a wrapper for the Client's apply method, but with attributes
that specify keyword arguments, those attributes can be set by keyword
argument with this method.
MinRK
API update involving map and load-balancing
r3635
Parameters
----------
block : bool
whether to wait for results
MinRK
reflect revised apply_bound pattern
r3655 track : bool
whether to create a MessageTracker to allow the user to
safely edit after arrays and buffers during non-copying
sends.
MinRK
update API after sagedays29...
r3664 #
after : Dependency or collection of msg_ids
Only for load-balanced execution (targets=None)
Specify a list of msg_ids as a time-based dependency.
This job will only be run *after* the dependencies
have been met.
follow : Dependency or collection of msg_ids
Only for load-balanced execution (targets=None)
Specify a list of msg_ids as a location-based dependency.
This job will only be run on an engine where this dependency
is met.
timeout : float/int or None
Only for load-balanced execution (targets=None)
Specify an amount of time (in seconds) for the scheduler to
wait for dependencies to be met before failing with a
DependencyTimeout.
MinRK
API update involving map and load-balancing
r3635 """
super(LoadBalancedView, self).set_flags(**kwargs)
for name in ('follow', 'after'):
if name in kwargs:
value = kwargs[name]
if self._validate_dependency(value):
setattr(self, name, value)
else:
raise ValueError("Invalid dependency: %r"%value)
if 'timeout' in kwargs:
t = kwargs['timeout']
MinRK
update API after sagedays29...
r3664 if not isinstance(t, (int, long, float, type(None))):
MinRK
API update involving map and load-balancing
r3635 raise TypeError("Invalid type for timeout: %r"%type(t))
if t is not None:
if t < 0:
raise ValueError("Invalid timeout: %s"%t)
self.timeout = t
MinRK
update API after sagedays29...
r3664
@sync_results
@save_ids
def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 after=None, follow=None, timeout=None,
targets=None):
MinRK
update API after sagedays29...
r3664 """calls f(*args, **kwargs) on a remote engine, returning the result.
This method temporarily sets all of `apply`'s flags for a single call.
Parameters
----------
f : callable
args : list [default: empty]
kwargs : dict [default: empty]
block : bool [default: self.block]
whether to block
track : bool [default: self.track]
whether to ask zmq to track the message, for safe non-copying sends
!!!!!! TODO: THE REST HERE !!!!
Returns
-------
if self.block is False:
returns AsyncResult
else:
returns actual result of f(*args, **kwargs) on the engine(s)
This will be a list of self.targets is also a list (even length 1), or
the single result if self.targets is an integer engine id
"""
# validate whether we can run
if self._socket.closed:
msg = "Task farming is disabled"
if self._task_scheme == 'pure':
msg += " because the pure ZMQ scheduler cannot handle"
msg += " disappearing engines."
raise RuntimeError(msg)
if self._task_scheme == 'pure':
# pure zmq scheme doesn't support dependencies
msg = "Pure ZMQ scheduler doesn't support dependencies"
if (follow or after):
# hard fail on DAG dependencies
raise RuntimeError(msg)
if isinstance(f, dependent):
# soft warn on functional dependencies
warnings.warn(msg, RuntimeWarning)
# build args
args = [] if args is None else args
kwargs = {} if kwargs is None else kwargs
block = self.block if block is None else block
track = self.track if track is None else track
after = self.after if after is None else after
follow = self.follow if follow is None else follow
timeout = self.timeout if timeout is None else timeout
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = self.targets if targets is None else targets
if targets is None:
idents = []
else:
idents = self.client._build_targets(targets)[0]
MinRK
update API after sagedays29...
r3664 after = self._render_dependency(after)
follow = self._render_dependency(follow)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
MinRK
update API after sagedays29...
r3664
msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
subheader=subheader)
tracker = None if track is False else msg['tracker']
ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
if block:
try:
return ar.get()
except KeyboardInterrupt:
pass
return ar
MinRK
API update involving map and load-balancing
r3635 @spin_after
@save_ids
def map(self, f, *sequences, **kwargs):
MinRK
update API after sagedays29...
r3664 """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639
Parallel version of builtin `map`, load-balanced by this View.
MinRK
API update involving map and load-balancing
r3635
MinRK
update API after sagedays29...
r3664 `block`, and `chunksize` can be specified by keyword only.
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639
MinRK
update API after sagedays29...
r3664 Each `chunksize` elements will be a separate task, and will be
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 load-balanced. This lets individual elements be available for iteration
as soon as they arrive.
MinRK
API update involving map and load-balancing
r3635
Parameters
----------
f : callable
function to be mapped
*sequences: one or more sequences of matching length
the sequences to be distributed and passed to `f`
block : bool
whether to wait for the result or not [default self.block]
MinRK
reflect revised apply_bound pattern
r3655 track : bool
whether to create a MessageTracker to allow the user to
safely edit after arrays and buffers during non-copying
sends.
MinRK
update API after sagedays29...
r3664 chunksize : int
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 how many elements should be in each task [default 1]
MinRK
API update involving map and load-balancing
r3635
Returns
-------
if block=False:
AsyncMapResult
An object like AsyncResult, but which reassembles the sequence of results
into a single list. AsyncMapResults can be iterated through before all
results are complete.
else:
the result of map(f,*sequences)
"""
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 # default
MinRK
API update involving map and load-balancing
r3635 block = kwargs.get('block', self.block)
MinRK
update API after sagedays29...
r3664 chunksize = kwargs.get('chunksize', 1)
MinRK
Client -> HasTraits, update examples with API tweaks
r3636
keyset = set(kwargs.keys())
MinRK
update API after sagedays29...
r3664 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 if extra_keys:
raise TypeError("Invalid kwargs: %s"%list(extra_keys))
MinRK
API update involving map and load-balancing
r3635
assert len(sequences) > 0, "must have some sequences to map onto!"
MinRK
update API after sagedays29...
r3664 pf = ParallelFunction(self, f, block=block, chunksize=chunksize)
MinRK
API update involving map and load-balancing
r3635 return pf.map(*sequences)
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665
MinRK
cleanup pass
r3644 __all__ = ['LoadBalancedView', 'DirectView']