view.py
1048 lines
| 36.5 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """Views of remote engines. | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3584 | #----------------------------------------------------------------------------- | ||
MinRK
|
r4018 | # Copyright (C) 2010-2011 The IPython Development Team | ||
MinRK
|
r3584 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3665 | import imp | ||
import sys | ||||
MinRK
|
r3664 | import warnings | ||
from contextlib import contextmanager | ||||
MinRK
|
r3665 | from types import ModuleType | ||
MinRK
|
r3664 | |||
import zmq | ||||
Thomas Kluyver
|
r3886 | from IPython.testing.skipdoctest import skip_doctest | ||
MinRK
|
r3873 | from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat, CInt | ||
MinRK
|
r3539 | from IPython.external.decorator import decorator | ||
MinRK
|
r3642 | |||
MinRK
|
r3673 | from IPython.parallel import util | ||
from IPython.parallel.controller.dependency import Dependency, dependent | ||||
MinRK
|
r3664 | from . import map as Map | ||
from .asyncresult import AsyncResult, AsyncMapResult | ||||
MinRK
|
r3642 | from .remotefunction import ParallelFunction, parallel, remote | ||
MinRK
|
r3539 | |||
MinRK
|
r3584 | #----------------------------------------------------------------------------- | ||
# Decorators | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
@decorator | ||||
MinRK
|
r3543 | def save_ids(f, self, *args, **kwargs): | ||
MinRK
|
r3584 | """Keep our history and outstanding attributes up to date after a method call.""" | ||
MinRK
|
r3588 | n_previous = len(self.client.history) | ||
MinRK
|
r3664 | try: | ||
ret = f(self, *args, **kwargs) | ||||
finally: | ||||
nmsgs = len(self.client.history) - n_previous | ||||
msg_ids = self.client.history[-nmsgs:] | ||||
self.history.extend(msg_ids) | ||||
map(self.outstanding.add, msg_ids) | ||||
MinRK
|
r3543 | return ret | ||
@decorator | ||||
def sync_results(f, self, *args, **kwargs): | ||||
MinRK
|
r3584 | """sync relevant results from self.client to our results attribute.""" | ||
MinRK
|
r3543 | ret = f(self, *args, **kwargs) | ||
delta = self.outstanding.difference(self.client.outstanding) | ||||
completed = self.outstanding.intersection(delta) | ||||
self.outstanding = self.outstanding.difference(completed) | ||||
for msg_id in completed: | ||||
self.results[msg_id] = self.client.results[msg_id] | ||||
return ret | ||||
@decorator | ||||
def spin_after(f, self, *args, **kwargs): | ||||
MinRK
|
r3584 | """call spin after the method.""" | ||
MinRK
|
r3543 | ret = f(self, *args, **kwargs) | ||
self.spin() | ||||
return ret | ||||
MinRK
|
r3584 | #----------------------------------------------------------------------------- | ||
# Classes | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3543 | |||
Thomas Kluyver
|
r3886 | @skip_doctest | ||
MinRK
|
r3635 | class View(HasTraits): | ||
MinRK
|
r3584 | """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. | ||
Don't use this class, use subclasses. | ||||
MinRK
|
r3664 | |||
Methods | ||||
------- | ||||
spin | ||||
flushes incoming results and registration state changes | ||||
control methods spin, and requesting `ids` also ensures up to date | ||||
wait | ||||
wait on one or more msg_ids | ||||
execution methods | ||||
apply | ||||
legacy: execute, run | ||||
data movement | ||||
push, pull, scatter, gather | ||||
query methods | ||||
get_result, queue_status, purge_results, result_status | ||||
control methods | ||||
abort, shutdown | ||||
MinRK
|
r3584 | """ | ||
MinRK
|
r3665 | # flags | ||
MinRK
|
r3635 | block=Bool(False) | ||
MinRK
|
r3664 | track=Bool(True) | ||
MinRK
|
r3665 | targets = Any() | ||
MinRK
|
r3635 | history=List() | ||
outstanding = Set() | ||||
results = Dict() | ||||
MinRK
|
r3673 | client = Instance('IPython.parallel.Client') | ||
MinRK
|
r3635 | |||
MinRK
|
r3664 | _socket = Instance('zmq.Socket') | ||
MinRK
|
r3665 | _flag_names = List(['targets', 'block', 'track']) | ||
MinRK
|
r3644 | _targets = Any() | ||
MinRK
|
r3664 | _idents = Any() | ||
MinRK
|
r3539 | |||
MinRK
|
r3665 | def __init__(self, client=None, socket=None, **flags): | ||
MinRK
|
r3664 | super(View, self).__init__(client=client, _socket=socket) | ||
MinRK
|
r3539 | self.block = client.block | ||
MinRK
|
r3635 | |||
MinRK
|
r3665 | self.set_flags(**flags) | ||
MinRK
|
r3625 | |||
MinRK
|
r3639 | assert not self.__class__ is View, "Don't use base View objects, use subclasses" | ||
MinRK
|
r3543 | |||
MinRK
|
r3539 | def __repr__(self): | ||
MinRK
|
r3665 | strtargets = str(self.targets) | ||
MinRK
|
r3539 | if len(strtargets) > 16: | ||
strtargets = strtargets[:12]+'...]' | ||||
return "<%s %s>"%(self.__class__.__name__, strtargets) | ||||
MinRK
|
r3543 | |||
MinRK
|
r3635 | def set_flags(self, **kwargs): | ||
"""set my attribute flags by keyword. | ||||
MinRK
|
r3664 | Views determine behavior with a few attributes (`block`, `track`, etc.). | ||
These attributes can be set all at once by name with this method. | ||||
MinRK
|
r3635 | |||
Parameters | ||||
---------- | ||||
block : bool | ||||
whether to wait for results | ||||
MinRK
|
r3655 | track : bool | ||
whether to create a MessageTracker to allow the user to | ||||
safely edit after arrays and buffers during non-copying | ||||
sends. | ||||
MinRK
|
r3635 | """ | ||
MinRK
|
r3664 | for name, value in kwargs.iteritems(): | ||
if name not in self._flag_names: | ||||
raise KeyError("Invalid name: %r"%name) | ||||
else: | ||||
setattr(self, name, value) | ||||
MinRK
|
r3635 | |||
MinRK
|
r3664 | @contextmanager | ||
def temp_flags(self, **kwargs): | ||||
"""temporarily set flags, for use in `with` statements. | ||||
See set_flags for permanent setting of flags | ||||
Examples | ||||
-------- | ||||
>>> view.track=False | ||||
... | ||||
>>> with view.temp_flags(track=True): | ||||
... ar = view.apply(dostuff, my_big_array) | ||||
... ar.tracker.wait() # wait for send to finish | ||||
>>> view.track | ||||
False | ||||
""" | ||||
# preflight: save flags, and set temporaries | ||||
saved_flags = {} | ||||
for f in self._flag_names: | ||||
saved_flags[f] = getattr(self, f) | ||||
self.set_flags(**kwargs) | ||||
# yield to the with-statement block | ||||
MinRK
|
r3665 | try: | ||
yield | ||||
finally: | ||||
# postflight: restore saved flags | ||||
self.set_flags(**saved_flags) | ||||
MinRK
|
r3664 | |||
MinRK
|
r3635 | #---------------------------------------------------------------- | ||
MinRK
|
r3664 | # apply | ||
MinRK
|
r3635 | #---------------------------------------------------------------- | ||
MinRK
|
r3625 | |||
MinRK
|
r3543 | @sync_results | ||
@save_ids | ||||
MinRK
|
r3664 | def _really_apply(self, f, args, kwargs, block=None, **options): | ||
"""wrapper for client.send_apply_message""" | ||||
raise NotImplementedError("Implement in subclasses") | ||||
MinRK
|
r3539 | def apply(self, f, *args, **kwargs): | ||
"""calls f(*args, **kwargs) on remote engines, returning the result. | ||||
MinRK
|
r3664 | This method sets all apply flags via this View's attributes. | ||
MinRK
|
r3539 | |||
if self.block is False: | ||||
MinRK
|
r3655 | returns AsyncResult | ||
MinRK
|
r3539 | else: | ||
returns actual result of f(*args, **kwargs) | ||||
""" | ||||
MinRK
|
r3664 | return self._really_apply(f, args, kwargs) | ||
MinRK
|
r3543 | |||
MinRK
|
r3539 | def apply_async(self, f, *args, **kwargs): | ||
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner. | ||||
MinRK
|
r3655 | returns AsyncResult | ||
MinRK
|
r3539 | """ | ||
MinRK
|
r3664 | return self._really_apply(f, args, kwargs, block=False) | ||
MinRK
|
r3543 | |||
@spin_after | ||||
MinRK
|
r3539 | def apply_sync(self, f, *args, **kwargs): | ||
"""calls f(*args, **kwargs) on remote engines in a blocking manner, | ||||
returning the result. | ||||
returns: actual result of f(*args, **kwargs) | ||||
""" | ||||
MinRK
|
r3664 | return self._really_apply(f, args, kwargs, block=True) | ||
MinRK
|
r3543 | |||
MinRK
|
r3664 | #---------------------------------------------------------------- | ||
# wrappers for client and control methods | ||||
#---------------------------------------------------------------- | ||||
MinRK
|
r3543 | @sync_results | ||
MinRK
|
r3664 | def spin(self): | ||
"""spin the client, and sync""" | ||||
self.client.spin() | ||||
@sync_results | ||||
def wait(self, jobs=None, timeout=-1): | ||||
"""waits on one or more `jobs`, for up to `timeout` seconds. | ||||
MinRK
|
r3539 | |||
MinRK
|
r3664 | Parameters | ||
---------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3664 | jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects | ||
ints are indices to self.history | ||||
strs are msg_ids | ||||
default: wait on all outstanding messages | ||||
timeout : float | ||||
a time in seconds, after which to give up. | ||||
default is -1, which means no timeout | ||||
MinRK
|
r3539 | |||
MinRK
|
r3664 | Returns | ||
------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3664 | True : when all msg_ids are done | ||
False : timeout reached, some msg_ids still outstanding | ||||
MinRK
|
r3539 | """ | ||
MinRK
|
r3664 | if jobs is None: | ||
jobs = self.history | ||||
return self.client.wait(jobs, timeout) | ||||
MinRK
|
r3590 | |||
MinRK
|
r3665 | def abort(self, jobs=None, targets=None, block=None): | ||
MinRK
|
r3559 | """Abort jobs on my engines. | ||
Parameters | ||||
---------- | ||||
MinRK
|
r3639 | jobs : None, str, list of strs, optional | ||
MinRK
|
r3559 | if None: abort all jobs. | ||
else: abort specific msg_id(s). | ||||
""" | ||||
block = block if block is not None else self.block | ||||
MinRK
|
r3665 | targets = targets if targets is not None else self.targets | ||
return self.client.abort(jobs=jobs, targets=targets, block=block) | ||||
MinRK
|
r3559 | |||
MinRK
|
r3665 | def queue_status(self, targets=None, verbose=False): | ||
MinRK
|
r3559 | """Fetch the Queue status of my engines""" | ||
MinRK
|
r3665 | targets = targets if targets is not None else self.targets | ||
return self.client.queue_status(targets=targets, verbose=verbose) | ||||
MinRK
|
r3559 | |||
MinRK
|
r3639 | def purge_results(self, jobs=[], targets=[]): | ||
MinRK
|
r3559 | """Instruct the controller to forget specific results.""" | ||
if targets is None or targets == 'all': | ||||
MinRK
|
r3665 | targets = self.targets | ||
MinRK
|
r3639 | return self.client.purge_results(jobs=jobs, targets=targets) | ||
MinRK
|
r3667 | def shutdown(self, targets=None, restart=False, hub=False, block=None): | ||
"""Terminates one or more engine processes, optionally including the hub. | ||||
""" | ||||
block = self.block if block is None else block | ||||
if targets is None or targets == 'all': | ||||
targets = self.targets | ||||
return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block) | ||||
MinRK
|
r3639 | @spin_after | ||
def get_result(self, indices_or_msg_ids=None): | ||||
"""return one or more results, specified by history index or msg_id. | ||||
MinRK
|
r3635 | |||
MinRK
|
r3639 | See client.get_result for details. | ||
""" | ||||
if indices_or_msg_ids is None: | ||||
indices_or_msg_ids = -1 | ||||
if isinstance(indices_or_msg_ids, int): | ||||
indices_or_msg_ids = self.history[indices_or_msg_ids] | ||||
elif isinstance(indices_or_msg_ids, (list,tuple,set)): | ||||
indices_or_msg_ids = list(indices_or_msg_ids) | ||||
for i,index in enumerate(indices_or_msg_ids): | ||||
if isinstance(index, int): | ||||
indices_or_msg_ids[i] = self.history[index] | ||||
return self.client.get_result(indices_or_msg_ids) | ||||
MinRK
|
r3635 | #------------------------------------------------------------------- | ||
MinRK
|
r3636 | # Map | ||
#------------------------------------------------------------------- | ||||
def map(self, f, *sequences, **kwargs): | ||||
"""override in subclasses""" | ||||
raise NotImplementedError | ||||
def map_async(self, f, *sequences, **kwargs): | ||||
"""Parallel version of builtin `map`, using this view's engines. | ||||
This is equivalent to map(...block=False) | ||||
MinRK
|
r3639 | See `self.map` for details. | ||
MinRK
|
r3636 | """ | ||
if 'block' in kwargs: | ||||
raise TypeError("map_async doesn't take a `block` keyword argument.") | ||||
kwargs['block'] = False | ||||
return self.map(f,*sequences,**kwargs) | ||||
def map_sync(self, f, *sequences, **kwargs): | ||||
"""Parallel version of builtin `map`, using this view's engines. | ||||
This is equivalent to map(...block=True) | ||||
MinRK
|
r3639 | See `self.map` for details. | ||
MinRK
|
r3636 | """ | ||
if 'block' in kwargs: | ||||
raise TypeError("map_sync doesn't take a `block` keyword argument.") | ||||
kwargs['block'] = True | ||||
return self.map(f,*sequences,**kwargs) | ||||
MinRK
|
r3639 | def imap(self, f, *sequences, **kwargs): | ||
"""Parallel version of `itertools.imap`. | ||||
See `self.map` for details. | ||||
MinRK
|
r3664 | |||
MinRK
|
r3639 | """ | ||
return iter(self.map_async(f,*sequences, **kwargs)) | ||||
MinRK
|
r3636 | #------------------------------------------------------------------- | ||
MinRK
|
r3635 | # Decorators | ||
#------------------------------------------------------------------- | ||||
MinRK
|
r3664 | def remote(self, block=True, **flags): | ||
MinRK
|
r3635 | """Decorator for making a RemoteFunction""" | ||
MinRK
|
r3664 | block = self.block if block is None else block | ||
return remote(self, block=block, **flags) | ||||
MinRK
|
r3590 | |||
MinRK
|
r3664 | def parallel(self, dist='b', block=None, **flags): | ||
MinRK
|
r3636 | """Decorator for making a ParallelFunction""" | ||
block = self.block if block is None else block | ||||
MinRK
|
r3664 | return parallel(self, dist=dist, block=block, **flags) | ||
MinRK
|
r3543 | |||
Thomas Kluyver
|
r3886 | @skip_doctest | ||
MinRK
|
r3539 | class DirectView(View): | ||
MinRK
|
r3584 | """Direct Multiplexer View of one or more engines. | ||
These are created via indexed access to a client: | ||||
>>> dv_1 = client[1] | ||||
>>> dv_all = client[:] | ||||
>>> dv_even = client[::2] | ||||
>>> dv_some = client[1:3] | ||||
MinRK
|
r3624 | This object provides dictionary access to engine namespaces: | ||
# push a=5: | ||||
>>> dv['a'] = 5 | ||||
# pull 'foo': | ||||
>>> db['foo'] | ||||
MinRK
|
r3590 | |||
MinRK
|
r3584 | """ | ||
MinRK
|
r3539 | |||
MinRK
|
r3664 | def __init__(self, client=None, socket=None, targets=None): | ||
super(DirectView, self).__init__(client=client, socket=socket, targets=targets) | ||||
MinRK
|
r3665 | |||
@property | ||||
def importer(self): | ||||
"""sync_imports(local=True) as a property. | ||||
MinRK
|
r3664 | |||
MinRK
|
r3665 | See sync_imports for details. | ||
""" | ||||
return self.sync_imports(True) | ||||
@contextmanager | ||||
def sync_imports(self, local=True): | ||||
"""Context Manager for performing simultaneous local and remote imports. | ||||
'import x as y' will *not* work. The 'as y' part will simply be ignored. | ||||
>>> with view.sync_imports(): | ||||
... from numpy import recarray | ||||
importing recarray from numpy on engine(s) | ||||
""" | ||||
import __builtin__ | ||||
local_import = __builtin__.__import__ | ||||
modules = set() | ||||
results = [] | ||||
@util.interactive | ||||
def remote_import(name, fromlist, level): | ||||
"""the function to be passed to apply, that actually performs the import | ||||
on the engine, and loads up the user namespace. | ||||
""" | ||||
import sys | ||||
user_ns = globals() | ||||
mod = __import__(name, fromlist=fromlist, level=level) | ||||
if fromlist: | ||||
for key in fromlist: | ||||
user_ns[key] = getattr(mod, key) | ||||
else: | ||||
user_ns[name] = sys.modules[name] | ||||
def view_import(name, globals={}, locals={}, fromlist=[], level=-1): | ||||
"""the drop-in replacement for __import__, that optionally imports | ||||
locally as well. | ||||
""" | ||||
# don't override nested imports | ||||
save_import = __builtin__.__import__ | ||||
__builtin__.__import__ = local_import | ||||
if imp.lock_held(): | ||||
# this is a side-effect import, don't do it remotely, or even | ||||
# ignore the local effects | ||||
return local_import(name, globals, locals, fromlist, level) | ||||
imp.acquire_lock() | ||||
if local: | ||||
mod = local_import(name, globals, locals, fromlist, level) | ||||
else: | ||||
raise NotImplementedError("remote-only imports not yet implemented") | ||||
imp.release_lock() | ||||
key = name+':'+','.join(fromlist or []) | ||||
if level == -1 and key not in modules: | ||||
modules.add(key) | ||||
if fromlist: | ||||
print "importing %s from %s on engine(s)"%(','.join(fromlist), name) | ||||
else: | ||||
print "importing %s on engine(s)"%name | ||||
results.append(self.apply_async(remote_import, name, fromlist, level)) | ||||
# restore override | ||||
__builtin__.__import__ = save_import | ||||
return mod | ||||
# override __import__ | ||||
__builtin__.__import__ = view_import | ||||
try: | ||||
# enter the block | ||||
yield | ||||
except ImportError: | ||||
if not local: | ||||
# ignore import errors if not doing local imports | ||||
pass | ||||
finally: | ||||
# always restore __import__ | ||||
__builtin__.__import__ = local_import | ||||
for r in results: | ||||
# raise possible remote ImportErrors here | ||||
r.get() | ||||
MinRK
|
r3635 | |||
MinRK
|
r3664 | @sync_results | ||
MinRK
|
r3635 | @save_ids | ||
MinRK
|
r3665 | def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None): | ||
MinRK
|
r3664 | """calls f(*args, **kwargs) on remote engines, returning the result. | ||
This method sets all of `apply`'s flags via this View's attributes. | ||||
Parameters | ||||
---------- | ||||
f : callable | ||||
args : list [default: empty] | ||||
kwargs : dict [default: empty] | ||||
MinRK
|
r3665 | targets : target list [default: self.targets] | ||
where to run | ||||
MinRK
|
r3664 | block : bool [default: self.block] | ||
whether to block | ||||
track : bool [default: self.track] | ||||
whether to ask zmq to track the message, for safe non-copying sends | ||||
Returns | ||||
------- | ||||
if self.block is False: | ||||
returns AsyncResult | ||||
else: | ||||
returns actual result of f(*args, **kwargs) on the engine(s) | ||||
This will be a list of self.targets is also a list (even length 1), or | ||||
the single result if self.targets is an integer engine id | ||||
""" | ||||
args = [] if args is None else args | ||||
kwargs = {} if kwargs is None else kwargs | ||||
block = self.block if block is None else block | ||||
track = self.track if track is None else track | ||||
MinRK
|
r3665 | targets = self.targets if targets is None else targets | ||
_idents = self.client._build_targets(targets)[0] | ||||
MinRK
|
r3664 | msg_ids = [] | ||
trackers = [] | ||||
MinRK
|
r3665 | for ident in _idents: | ||
MinRK
|
r3664 | msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, | ||
ident=ident) | ||||
if track: | ||||
trackers.append(msg['tracker']) | ||||
msg_ids.append(msg['msg_id']) | ||||
tracker = None if track is False else zmq.MessageTracker(*trackers) | ||||
MinRK
|
r3665 | ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker) | ||
MinRK
|
r3664 | if block: | ||
try: | ||||
return ar.get() | ||||
except KeyboardInterrupt: | ||||
pass | ||||
return ar | ||||
@spin_after | ||||
MinRK
|
r3635 | def map(self, f, *sequences, **kwargs): | ||
MinRK
|
r3664 | """view.map(f, *sequences, block=self.block) => list|AsyncMapResult | ||
MinRK
|
r3639 | |||
Parallel version of builtin `map`, using this View's `targets`. | ||||
MinRK
|
r3635 | |||
There will be one task per target, so work will be chunked | ||||
if the sequences are longer than `targets`. | ||||
Results can be iterated as they are ready, but will become available in chunks. | ||||
Parameters | ||||
---------- | ||||
f : callable | ||||
function to be mapped | ||||
*sequences: one or more sequences of matching length | ||||
the sequences to be distributed and passed to `f` | ||||
block : bool | ||||
whether to wait for the result or not [default self.block] | ||||
Returns | ||||
------- | ||||
if block=False: | ||||
AsyncMapResult | ||||
An object like AsyncResult, but which reassembles the sequence of results | ||||
into a single list. AsyncMapResults can be iterated through before all | ||||
results are complete. | ||||
MinRK
|
r3639 | else: | ||
list | ||||
MinRK
|
r3635 | the result of map(f,*sequences) | ||
""" | ||||
MinRK
|
r3664 | block = kwargs.pop('block', self.block) | ||
MinRK
|
r3635 | for k in kwargs.keys(): | ||
MinRK
|
r3664 | if k not in ['block', 'track']: | ||
MinRK
|
r3635 | raise TypeError("invalid keyword arg, %r"%k) | ||
assert len(sequences) > 0, "must have some sequences to map onto!" | ||||
MinRK
|
r3664 | pf = ParallelFunction(self, f, block=block, **kwargs) | ||
MinRK
|
r3635 | return pf.map(*sequences) | ||
MinRK
|
r3665 | def execute(self, code, targets=None, block=None): | ||
MinRK
|
r3664 | """Executes `code` on `targets` in blocking or nonblocking manner. | ||
MinRK
|
r3642 | |||
MinRK
|
r3664 | ``execute`` is always `bound` (affects engine namespace) | ||
MinRK
|
r3642 | |||
MinRK
|
r3664 | Parameters | ||
---------- | ||||
code : str | ||||
the code string to be executed | ||||
block : bool | ||||
whether or not to wait until done to return | ||||
default: self.block | ||||
""" | ||||
MinRK
|
r3665 | return self._really_apply(util._execute, args=(code,), block=block, targets=targets) | ||
MinRK
|
r3590 | |||
MinRK
|
r3665 | def run(self, filename, targets=None, block=None): | ||
MinRK
|
r3664 | """Execute contents of `filename` on my engine(s). | ||
MinRK
|
r3642 | |||
MinRK
|
r3664 | This simply reads the contents of the file and calls `execute`. | ||
MinRK
|
r3642 | |||
MinRK
|
r3664 | Parameters | ||
---------- | ||||
filename : str | ||||
The path to the file | ||||
targets : int/str/list of ints/strs | ||||
the engines on which to execute | ||||
default : all | ||||
block : bool | ||||
whether or not to wait until done | ||||
default: self.block | ||||
""" | ||||
with open(filename, 'r') as f: | ||||
# add newline in case of trailing indented whitespace | ||||
# which will cause SyntaxError | ||||
code = f.read()+'\n' | ||||
MinRK
|
r3665 | return self.execute(code, block=block, targets=targets) | ||
MinRK
|
r3642 | |||
MinRK
|
r3539 | def update(self, ns): | ||
MinRK
|
r3664 | """update remote namespace with dict `ns` | ||
See `push` for details. | ||||
""" | ||||
return self.push(ns, block=self.block, track=self.track) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3665 | def push(self, ns, targets=None, block=None, track=None): | ||
MinRK
|
r3664 | """update remote namespace with dict `ns` | ||
MinRK
|
r3642 | |||
MinRK
|
r3664 | Parameters | ||
---------- | ||||
MinRK
|
r3642 | |||
MinRK
|
r3664 | ns : dict | ||
dict of keys with which to update engine namespace(s) | ||||
block : bool [default : self.block] | ||||
whether to wait to be notified of engine receipt | ||||
""" | ||||
block = block if block is not None else self.block | ||||
track = track if track is not None else self.track | ||||
MinRK
|
r3665 | targets = targets if targets is not None else self.targets | ||
MinRK
|
r3664 | # applier = self.apply_sync if block else self.apply_async | ||
if not isinstance(ns, dict): | ||||
raise TypeError("Must be a dict, not %s"%type(ns)) | ||||
MinRK
|
r3665 | return self._really_apply(util._push, (ns,), block=block, track=track, targets=targets) | ||
MinRK
|
r3641 | |||
MinRK
|
r3539 | def get(self, key_s): | ||
"""get object(s) by `key_s` from remote namespace | ||||
MinRK
|
r3664 | |||
see `pull` for details. | ||||
""" | ||||
MinRK
|
r3539 | # block = block if block is not None else self.block | ||
MinRK
|
r3664 | return self.pull(key_s, block=True) | ||
MinRK
|
r3539 | |||
MinRK
|
r3689 | def pull(self, names, targets=None, block=None): | ||
MinRK
|
r3664 | """get object(s) by `name` from remote namespace | ||
MinRK
|
r3559 | will return one object if it is a key. | ||
MinRK
|
r3664 | can also take a list of keys, in which case it will return a list of objects. | ||
""" | ||||
MinRK
|
r3559 | block = block if block is not None else self.block | ||
MinRK
|
r3665 | targets = targets if targets is not None else self.targets | ||
MinRK
|
r3664 | applier = self.apply_sync if block else self.apply_async | ||
if isinstance(names, basestring): | ||||
pass | ||||
elif isinstance(names, (list,tuple,set)): | ||||
for key in names: | ||||
if not isinstance(key, basestring): | ||||
raise TypeError("keys must be str, not type %r"%type(key)) | ||||
else: | ||||
raise TypeError("names must be strs, not %r"%names) | ||||
MinRK
|
r3665 | return self._really_apply(util._pull, (names,), block=block, targets=targets) | ||
MinRK
|
r3539 | |||
MinRK
|
r3665 | def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None): | ||
MinRK
|
r3587 | """ | ||
Partition a Python sequence and send the partitions to a set of engines. | ||||
""" | ||||
block = block if block is not None else self.block | ||||
MinRK
|
r3664 | track = track if track is not None else self.track | ||
MinRK
|
r3665 | targets = targets if targets is not None else self.targets | ||
MinRK
|
r3664 | mapObject = Map.dists[dist]() | ||
nparts = len(targets) | ||||
msg_ids = [] | ||||
trackers = [] | ||||
for index, engineid in enumerate(targets): | ||||
partition = mapObject.getPartition(seq, index, nparts) | ||||
if flatten and len(partition) == 1: | ||||
MinRK
|
r3665 | ns = {key: partition[0]} | ||
MinRK
|
r3664 | else: | ||
MinRK
|
r3665 | ns = {key: partition} | ||
r = self.push(ns, block=False, track=track, targets=engineid) | ||||
MinRK
|
r3664 | msg_ids.extend(r.msg_ids) | ||
if track: | ||||
trackers.append(r._tracker) | ||||
MinRK
|
r3587 | |||
MinRK
|
r3664 | if track: | ||
tracker = zmq.MessageTracker(*trackers) | ||||
else: | ||||
tracker = None | ||||
r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker) | ||||
if block: | ||||
r.wait() | ||||
else: | ||||
return r | ||||
MinRK
|
r3587 | |||
MinRK
|
r3590 | @sync_results | ||
@save_ids | ||||
MinRK
|
r3665 | def gather(self, key, dist='b', targets=None, block=None): | ||
MinRK
|
r3587 | """ | ||
Gather a partitioned sequence on a set of engines as a single local seq. | ||||
""" | ||||
block = block if block is not None else self.block | ||||
MinRK
|
r3665 | targets = targets if targets is not None else self.targets | ||
MinRK
|
r3664 | mapObject = Map.dists[dist]() | ||
msg_ids = [] | ||||
MinRK
|
r3665 | |||
for index, engineid in enumerate(targets): | ||||
msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids) | ||||
MinRK
|
r3587 | |||
MinRK
|
r3664 | r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather') | ||
if block: | ||||
try: | ||||
return r.get() | ||||
except KeyboardInterrupt: | ||||
pass | ||||
return r | ||||
MinRK
|
r3587 | |||
MinRK
|
r3539 | def __getitem__(self, key): | ||
return self.get(key) | ||||
MinRK
|
r3559 | def __setitem__(self,key, value): | ||
MinRK
|
r3539 | self.update({key:value}) | ||
MinRK
|
r3665 | def clear(self, targets=None, block=False): | ||
MinRK
|
r3540 | """Clear the remote namespaces on my engines.""" | ||
block = block if block is not None else self.block | ||||
MinRK
|
r3665 | targets = targets if targets is not None else self.targets | ||
return self.client.clear(targets=targets, block=block) | ||||
MinRK
|
r3540 | |||
MinRK
|
r3665 | def kill(self, targets=None, block=True): | ||
MinRK
|
r3540 | """Kill my engines.""" | ||
block = block if block is not None else self.block | ||||
MinRK
|
r3665 | targets = targets if targets is not None else self.targets | ||
return self.client.kill(targets=targets, block=block) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3590 | #---------------------------------------- | ||
# activate for %px,%autopx magics | ||||
#---------------------------------------- | ||||
def activate(self): | ||||
"""Make this `View` active for parallel magic commands. | ||||
IPython has a magic command syntax to work with `MultiEngineClient` objects. | ||||
In a given IPython session there is a single active one. While | ||||
there can be many `Views` created and used by the user, | ||||
there is only one active one. The active `View` is used whenever | ||||
the magic commands %px and %autopx are used. | ||||
The activate() method is called on a given `View` to make it | ||||
active. Once this has been done, the magic commands can be used. | ||||
""" | ||||
try: | ||||
# This is injected into __builtins__. | ||||
ip = get_ipython() | ||||
except NameError: | ||||
print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." | ||||
else: | ||||
pmagic = ip.plugin_manager.get_plugin('parallelmagic') | ||||
MinRK
|
r3735 | if pmagic is None: | ||
ip.magic_load_ext('parallelmagic') | ||||
pmagic = ip.plugin_manager.get_plugin('parallelmagic') | ||||
pmagic.active_view = self | ||||
MinRK
|
r3590 | |||
MinRK
|
r3641 | |||
Thomas Kluyver
|
r3886 | @skip_doctest | ||
MinRK
|
r3539 | class LoadBalancedView(View): | ||
MinRK
|
r3635 | """An load-balancing View that only executes via the Task scheduler. | ||
MinRK
|
r3584 | |||
MinRK
|
r3635 | Load-balanced views can be created with the client's `view` method: | ||
MinRK
|
r3584 | |||
MinRK
|
r3664 | >>> v = client.load_balanced_view() | ||
MinRK
|
r3584 | |||
MinRK
|
r3635 | or targets can be specified, to restrict the potential destinations: | ||
MinRK
|
r3584 | |||
MinRK
|
r3664 | >>> v = client.client.load_balanced_view(([1,3]) | ||
MinRK
|
r3625 | |||
which would restrict loadbalancing to between engines 1 and 3. | ||||
MinRK
|
r3584 | |||
""" | ||||
MinRK
|
r3559 | |||
MinRK
|
r3665 | follow=Any() | ||
after=Any() | ||||
timeout=CFloat() | ||||
MinRK
|
r3873 | retries = CInt(0) | ||
MinRK
|
r3625 | |||
MinRK
|
r3665 | _task_scheme = Any() | ||
MinRK
|
r3873 | _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries']) | ||
MinRK
|
r3665 | |||
def __init__(self, client=None, socket=None, **flags): | ||||
super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags) | ||||
MinRK
|
r3664 | self._task_scheme=client._task_scheme | ||
MinRK
|
r3635 | |||
def _validate_dependency(self, dep): | ||||
"""validate a dependency. | ||||
For use in `set_flags`. | ||||
""" | ||||
MinRK
|
r4172 | if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)): | ||
MinRK
|
r3635 | return True | ||
elif isinstance(dep, (list,set, tuple)): | ||||
for d in dep: | ||||
MinRK
|
r4172 | if not isinstance(d, (basestring, AsyncResult)): | ||
MinRK
|
r3635 | return False | ||
elif isinstance(dep, dict): | ||||
if set(dep.keys()) != set(Dependency().as_dict().keys()): | ||||
return False | ||||
if not isinstance(dep['msg_ids'], list): | ||||
return False | ||||
for d in dep['msg_ids']: | ||||
MinRK
|
r4172 | if not isinstance(d, basestring): | ||
MinRK
|
r3635 | return False | ||
else: | ||||
return False | ||||
MinRK
|
r3664 | |||
return True | ||||
MinRK
|
r3635 | |||
MinRK
|
r3664 | def _render_dependency(self, dep): | ||
"""helper for building jsonable dependencies from various input forms.""" | ||||
if isinstance(dep, Dependency): | ||||
return dep.as_dict() | ||||
elif isinstance(dep, AsyncResult): | ||||
return dep.msg_ids | ||||
elif dep is None: | ||||
return [] | ||||
else: | ||||
# pass to Dependency constructor | ||||
return list(Dependency(dep)) | ||||
MinRK
|
r3635 | def set_flags(self, **kwargs): | ||
"""set my attribute flags by keyword. | ||||
MinRK
|
r3639 | A View is a wrapper for the Client's apply method, but with attributes | ||
that specify keyword arguments, those attributes can be set by keyword | ||||
argument with this method. | ||||
MinRK
|
r3635 | |||
Parameters | ||||
---------- | ||||
block : bool | ||||
whether to wait for results | ||||
MinRK
|
r3655 | track : bool | ||
whether to create a MessageTracker to allow the user to | ||||
safely edit after arrays and buffers during non-copying | ||||
sends. | ||||
MinRK
|
r3873 | |||
MinRK
|
r3664 | after : Dependency or collection of msg_ids | ||
Only for load-balanced execution (targets=None) | ||||
Specify a list of msg_ids as a time-based dependency. | ||||
This job will only be run *after* the dependencies | ||||
have been met. | ||||
follow : Dependency or collection of msg_ids | ||||
Only for load-balanced execution (targets=None) | ||||
Specify a list of msg_ids as a location-based dependency. | ||||
This job will only be run on an engine where this dependency | ||||
is met. | ||||
timeout : float/int or None | ||||
Only for load-balanced execution (targets=None) | ||||
Specify an amount of time (in seconds) for the scheduler to | ||||
wait for dependencies to be met before failing with a | ||||
DependencyTimeout. | ||||
MinRK
|
r3873 | |||
retries : int | ||||
Number of times a task will be retried on failure. | ||||
MinRK
|
r3635 | """ | ||
super(LoadBalancedView, self).set_flags(**kwargs) | ||||
for name in ('follow', 'after'): | ||||
if name in kwargs: | ||||
value = kwargs[name] | ||||
if self._validate_dependency(value): | ||||
setattr(self, name, value) | ||||
else: | ||||
raise ValueError("Invalid dependency: %r"%value) | ||||
if 'timeout' in kwargs: | ||||
t = kwargs['timeout'] | ||||
MinRK
|
r3664 | if not isinstance(t, (int, long, float, type(None))): | ||
MinRK
|
r3635 | raise TypeError("Invalid type for timeout: %r"%type(t)) | ||
if t is not None: | ||||
if t < 0: | ||||
raise ValueError("Invalid timeout: %s"%t) | ||||
self.timeout = t | ||||
MinRK
|
r3664 | |||
@sync_results | ||||
@save_ids | ||||
def _really_apply(self, f, args=None, kwargs=None, block=None, track=None, | ||||
MinRK
|
r3665 | after=None, follow=None, timeout=None, | ||
MinRK
|
r3873 | targets=None, retries=None): | ||
MinRK
|
r3664 | """calls f(*args, **kwargs) on a remote engine, returning the result. | ||
This method temporarily sets all of `apply`'s flags for a single call. | ||||
Parameters | ||||
---------- | ||||
f : callable | ||||
args : list [default: empty] | ||||
kwargs : dict [default: empty] | ||||
block : bool [default: self.block] | ||||
whether to block | ||||
track : bool [default: self.track] | ||||
whether to ask zmq to track the message, for safe non-copying sends | ||||
!!!!!! TODO: THE REST HERE !!!! | ||||
Returns | ||||
------- | ||||
if self.block is False: | ||||
returns AsyncResult | ||||
else: | ||||
returns actual result of f(*args, **kwargs) on the engine(s) | ||||
This will be a list of self.targets is also a list (even length 1), or | ||||
the single result if self.targets is an integer engine id | ||||
""" | ||||
# validate whether we can run | ||||
if self._socket.closed: | ||||
msg = "Task farming is disabled" | ||||
if self._task_scheme == 'pure': | ||||
msg += " because the pure ZMQ scheduler cannot handle" | ||||
msg += " disappearing engines." | ||||
raise RuntimeError(msg) | ||||
if self._task_scheme == 'pure': | ||||
MinRK
|
r3873 | # pure zmq scheme doesn't support extra features | ||
msg = "Pure ZMQ scheduler doesn't support the following flags:" | ||||
"follow, after, retries, targets, timeout" | ||||
if (follow or after or retries or targets or timeout): | ||||
# hard fail on Scheduler flags | ||||
MinRK
|
r3664 | raise RuntimeError(msg) | ||
if isinstance(f, dependent): | ||||
# soft warn on functional dependencies | ||||
warnings.warn(msg, RuntimeWarning) | ||||
# build args | ||||
args = [] if args is None else args | ||||
kwargs = {} if kwargs is None else kwargs | ||||
block = self.block if block is None else block | ||||
track = self.track if track is None else track | ||||
after = self.after if after is None else after | ||||
MinRK
|
r3873 | retries = self.retries if retries is None else retries | ||
MinRK
|
r3664 | follow = self.follow if follow is None else follow | ||
timeout = self.timeout if timeout is None else timeout | ||||
MinRK
|
r3665 | targets = self.targets if targets is None else targets | ||
MinRK
|
r3873 | if not isinstance(retries, int): | ||
raise TypeError('retries must be int, not %r'%type(retries)) | ||||
MinRK
|
r3665 | if targets is None: | ||
idents = [] | ||||
else: | ||||
idents = self.client._build_targets(targets)[0] | ||||
MinRK
|
r4155 | # ensure *not* bytes | ||
idents = [ ident.decode() for ident in idents ] | ||||
MinRK
|
r3665 | |||
MinRK
|
r3664 | after = self._render_dependency(after) | ||
follow = self._render_dependency(follow) | ||||
MinRK
|
r3873 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) | ||
MinRK
|
r3664 | |||
msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, | ||||
subheader=subheader) | ||||
tracker = None if track is False else msg['tracker'] | ||||
ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker) | ||||
if block: | ||||
try: | ||||
return ar.get() | ||||
except KeyboardInterrupt: | ||||
pass | ||||
return ar | ||||
MinRK
|
r3635 | @spin_after | ||
@save_ids | ||||
def map(self, f, *sequences, **kwargs): | ||||
MinRK
|
r3664 | """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult | ||
MinRK
|
r3639 | |||
Parallel version of builtin `map`, load-balanced by this View. | ||||
MinRK
|
r3635 | |||
MinRK
|
r3664 | `block`, and `chunksize` can be specified by keyword only. | ||
MinRK
|
r3639 | |||
MinRK
|
r3664 | Each `chunksize` elements will be a separate task, and will be | ||
MinRK
|
r3639 | load-balanced. This lets individual elements be available for iteration | ||
as soon as they arrive. | ||||
MinRK
|
r3635 | |||
Parameters | ||||
---------- | ||||
f : callable | ||||
function to be mapped | ||||
*sequences: one or more sequences of matching length | ||||
the sequences to be distributed and passed to `f` | ||||
block : bool | ||||
whether to wait for the result or not [default self.block] | ||||
MinRK
|
r3655 | track : bool | ||
whether to create a MessageTracker to allow the user to | ||||
safely edit after arrays and buffers during non-copying | ||||
sends. | ||||
MinRK
|
r3664 | chunksize : int | ||
MinRK
|
r3639 | how many elements should be in each task [default 1] | ||
MinRK
|
r3635 | |||
Returns | ||||
------- | ||||
if block=False: | ||||
AsyncMapResult | ||||
An object like AsyncResult, but which reassembles the sequence of results | ||||
into a single list. AsyncMapResults can be iterated through before all | ||||
results are complete. | ||||
else: | ||||
the result of map(f,*sequences) | ||||
""" | ||||
MinRK
|
r3636 | # default | ||
MinRK
|
r3635 | block = kwargs.get('block', self.block) | ||
MinRK
|
r3664 | chunksize = kwargs.get('chunksize', 1) | ||
MinRK
|
r3636 | |||
keyset = set(kwargs.keys()) | ||||
MinRK
|
r3664 | extra_keys = keyset.difference_update(set(['block', 'chunksize'])) | ||
MinRK
|
r3636 | if extra_keys: | ||
raise TypeError("Invalid kwargs: %s"%list(extra_keys)) | ||||
MinRK
|
r3635 | |||
assert len(sequences) > 0, "must have some sequences to map onto!" | ||||
MinRK
|
r3664 | pf = ParallelFunction(self, f, block=block, chunksize=chunksize) | ||
MinRK
|
r3635 | return pf.map(*sequences) | ||
MinRK
|
r3665 | |||
Thomas Kluyver
|
r3886 | __all__ = ['LoadBalancedView', 'DirectView'] | ||