Show More
remotefunction.py
264 lines
| 8.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """Remote Functions and decorators for Views. | ||
Authors: | ||||
* Brian Granger | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3588 | #----------------------------------------------------------------------------- | ||
MinRK
|
r4018 | # Copyright (C) 2010-2011 The IPython Development Team | ||
MinRK
|
r3588 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4155 | from __future__ import division | ||
MinRK
|
r4159 | import sys | ||
MinRK
|
r3636 | import warnings | ||
MinRK
|
r10073 | from IPython.external.decorator import decorator | ||
Thomas Kluyver
|
r3886 | from IPython.testing.skipdoctest import skip_doctest | ||
MinRK
|
r3641 | |||
MinRK
|
r3642 | from . import map as Map | ||
from .asyncresult import AsyncMapResult | ||||
MinRK
|
r3588 | |||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r5821 | # Functions and Decorators | ||
MinRK
|
r3588 | #----------------------------------------------------------------------------- | ||
Thomas Kluyver
|
r3886 | @skip_doctest | ||
MinRK
|
r3664 | def remote(view, block=None, **flags): | ||
MinRK
|
r3588 | """Turn a function into a remote function. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | This method can be used for map: | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | In [1]: @remote(view,block=True) | ||
MinRK
|
r3641 | ...: def func(a): | ||
...: pass | ||||
MinRK
|
r3588 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | def remote_function(f): | ||
MinRK
|
r3664 | return RemoteFunction(view, f, block=block, **flags) | ||
MinRK
|
r3588 | return remote_function | ||
Thomas Kluyver
|
r3886 | @skip_doctest | ||
MinRK
|
r5171 | def parallel(view, dist='b', block=None, ordered=True, **flags): | ||
MinRK
|
r3588 | """Turn a function into a parallel remote function. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | This method can be used for map: | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | In [1]: @parallel(view, block=True) | ||
MinRK
|
r3641 | ...: def func(a): | ||
...: pass | ||||
MinRK
|
r3588 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | def parallel_function(f): | ||
MinRK
|
r5171 | return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags) | ||
MinRK
|
r3588 | return parallel_function | ||
MinRK
|
r5821 | def getname(f): | ||
"""Get the name of an object. | ||||
For use in case of callables that are not functions, and | ||||
thus may not have __name__ defined. | ||||
Order: f.__name__ > f.name > str(f) | ||||
""" | ||||
try: | ||||
return f.__name__ | ||||
except: | ||||
pass | ||||
try: | ||||
return f.name | ||||
except: | ||||
pass | ||||
return str(f) | ||||
MinRK
|
r10073 | @decorator | ||
def sync_view_results(f, self, *args, **kwargs): | ||||
"""sync relevant results from self.client to our results attribute. | ||||
This is a clone of view.sync_results, but for remote functions | ||||
""" | ||||
view = self.view | ||||
if view._in_sync_results: | ||||
return f(self, *args, **kwargs) | ||||
print 'in sync results', f | ||||
view._in_sync_results = True | ||||
try: | ||||
ret = f(self, *args, **kwargs) | ||||
finally: | ||||
view._in_sync_results = False | ||||
view._sync_results() | ||||
return ret | ||||
MinRK
|
r3588 | #-------------------------------------------------------------------------- | ||
# Classes | ||||
#-------------------------------------------------------------------------- | ||||
class RemoteFunction(object): | ||||
"""Turn an existing function into a remote function. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | view : View instance | ||
The view to be used for execution | ||||
MinRK
|
r3588 | f : callable | ||
The function to be wrapped into a remote function | ||||
block : bool [default: None] | ||||
Whether to wait for results or not. The default behavior is | ||||
MinRK
|
r3664 | to use the current `block` attribute of `view` | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | **flags : remaining kwargs are passed to View.temp_flags | ||
MinRK
|
r3588 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | view = None # the remote connection | ||
MinRK
|
r3588 | func = None # the wrapped function | ||
block = None # whether to block | ||||
MinRK
|
r3664 | flags = None # dict of extra kwargs for temp_flags | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def __init__(self, view, f, block=None, **flags): | ||
self.view = view | ||||
MinRK
|
r3588 | self.func = f | ||
self.block=block | ||||
MinRK
|
r3664 | self.flags=flags | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | def __call__(self, *args, **kwargs): | ||
MinRK
|
r3664 | block = self.view.block if self.block is None else self.block | ||
with self.view.temp_flags(block=block, **self.flags): | ||||
return self.view.apply(self.func, *args, **kwargs) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | |||
class ParallelFunction(RemoteFunction): | ||||
MinRK
|
r3644 | """Class for mapping a function to sequences. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3644 | This will distribute the sequences according the a mapper, and call | ||
the function on each sub-sequence. If called via map, then the function | ||||
will be called once on each element, rather that each sub-sequence. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3644 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | view : View instance | ||
The view to be used for execution | ||||
MinRK
|
r3644 | f : callable | ||
The function to be wrapped into a remote function | ||||
MinRK
|
r3664 | dist : str [default: 'b'] | ||
The key for which mapObject to use to distribute sequences | ||||
options are: | ||||
* 'b' : use contiguous chunks in order | ||||
* 'r' : use round-robin striping | ||||
MinRK
|
r3644 | block : bool [default: None] | ||
Whether to wait for results or not. The default behavior is | ||||
MinRK
|
r3664 | to use the current `block` attribute of `view` | ||
chunksize : int or None | ||||
MinRK
|
r3644 | The size of chunk to use when breaking up sequences in a load-balanced manner | ||
MinRK
|
r5171 | ordered : bool [default: True] | ||
Whether | ||||
MinRK
|
r3664 | **flags : remaining kwargs are passed to View.temp_flags | ||
MinRK
|
r3644 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | chunksize=None | ||
MinRK
|
r5171 | ordered=None | ||
MinRK
|
r3664 | mapObject=None | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5171 | def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): | ||
MinRK
|
r3664 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) | ||
self.chunksize = chunksize | ||||
MinRK
|
r5171 | self.ordered = ordered | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | mapClass = Map.dists[dist] | ||
self.mapObject = mapClass() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r10073 | @sync_view_results | ||
MinRK
|
r3588 | def __call__(self, *sequences): | ||
MinRK
|
r5290 | client = self.view.client | ||
MinRK
|
r3664 | # check that the length of sequences match | ||
MinRK
|
r3588 | len_0 = len(sequences[0]) | ||
for s in sequences: | ||||
if len(s)!=len_0: | ||||
MinRK
|
r3635 | msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) | ||
raise ValueError(msg) | ||||
MinRK
|
r3664 | balanced = 'Balanced' in self.view.__class__.__name__ | ||
if balanced: | ||||
if self.chunksize: | ||||
MinRK
|
r4155 | nparts = len_0//self.chunksize + int(len_0%self.chunksize > 0) | ||
MinRK
|
r3636 | else: | ||
nparts = len_0 | ||||
MinRK
|
r3664 | targets = [None]*nparts | ||
MinRK
|
r3588 | else: | ||
MinRK
|
r3664 | if self.chunksize: | ||
warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) | ||||
MinRK
|
r3588 | # multiplexed: | ||
MinRK
|
r3664 | targets = self.view.targets | ||
MinRK
|
r5290 | # 'all' is lazily evaluated at execution time, which is now: | ||
if targets == 'all': | ||||
targets = client._build_targets(targets)[1] | ||||
MinRK
|
r6513 | elif isinstance(targets, int): | ||
# single-engine view, targets must be iterable | ||||
targets = [targets] | ||||
MinRK
|
r3636 | nparts = len(targets) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | msg_ids = [] | ||
MinRK
|
r3635 | for index, t in enumerate(targets): | ||
MinRK
|
r3588 | args = [] | ||
for seq in sequences: | ||||
part = self.mapObject.getPartition(seq, index, nparts) | ||||
MinRK
|
r3639 | if len(part) == 0: | ||
MinRK
|
r3588 | continue | ||
else: | ||||
args.append(part) | ||||
if not args: | ||||
continue | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | # print (args) | ||
if hasattr(self, '_map'): | ||||
MinRK
|
r4155 | if sys.version_info[0] >= 3: | ||
f = lambda f, *sequences: list(map(f, *sequences)) | ||||
else: | ||||
f = map | ||||
MinRK
|
r3588 | args = [self.func]+args | ||
else: | ||||
f=self.func | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | view = self.view if balanced else client[t] | ||
with view.temp_flags(block=False, **self.flags): | ||||
ar = view.apply(f, *args) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | msg_ids.append(ar.msg_ids[0]) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5171 | r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, | ||
MinRK
|
r5821 | fname=getname(self.func), | ||
MinRK
|
r5171 | ordered=self.ordered | ||
) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | if self.block: | ||
MinRK
|
r3635 | try: | ||
return r.get() | ||||
except KeyboardInterrupt: | ||||
return r | ||||
MinRK
|
r3588 | else: | ||
return r | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | def map(self, *sequences): | ||
Bernardo B. Marques
|
r4872 | """call a function on each element of a sequence remotely. | ||
MinRK
|
r3644 | This should behave very much like the builtin map, but return an AsyncMapResult | ||
if self.block is False. | ||||
""" | ||||
# set _map as a flag for use inside self.__call__ | ||||
MinRK
|
r3588 | self._map = True | ||
MinRK
|
r3635 | try: | ||
ret = self.__call__(*sequences) | ||||
finally: | ||||
del self._map | ||||
MinRK
|
r3588 | return ret | ||
Thomas Kluyver
|
r3886 | __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] | ||