Show More
remotefunction.py
282 lines
| 8.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """Remote Functions and decorators for Views. | ||
Authors: | ||||
* Brian Granger | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3588 | #----------------------------------------------------------------------------- | ||
MinRK
|
r4018 | # Copyright (C) 2010-2011 The IPython Development Team | ||
MinRK
|
r3588 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4155 | from __future__ import division | ||
MinRK
|
r4159 | import sys | ||
MinRK
|
r3636 | import warnings | ||
MinRK
|
r10073 | from IPython.external.decorator import decorator | ||
Thomas Kluyver
|
r3886 | from IPython.testing.skipdoctest import skip_doctest | ||
MinRK
|
r3641 | |||
MinRK
|
r3642 | from . import map as Map | ||
from .asyncresult import AsyncMapResult | ||||
MinRK
|
r3588 | |||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r5821 | # Functions and Decorators | ||
MinRK
|
r3588 | #----------------------------------------------------------------------------- | ||
Thomas Kluyver
|
r3886 | @skip_doctest | ||
MinRK
|
r3664 | def remote(view, block=None, **flags): | ||
MinRK
|
r3588 | """Turn a function into a remote function. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | This method can be used for map: | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | In [1]: @remote(view,block=True) | ||
MinRK
|
r3641 | ...: def func(a): | ||
...: pass | ||||
MinRK
|
r3588 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | def remote_function(f): | ||
MinRK
|
r3664 | return RemoteFunction(view, f, block=block, **flags) | ||
MinRK
|
r3588 | return remote_function | ||
Thomas Kluyver
|
r3886 | @skip_doctest | ||
MinRK
|
r5171 | def parallel(view, dist='b', block=None, ordered=True, **flags): | ||
MinRK
|
r3588 | """Turn a function into a parallel remote function. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | This method can be used for map: | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | In [1]: @parallel(view, block=True) | ||
MinRK
|
r3641 | ...: def func(a): | ||
...: pass | ||||
MinRK
|
r3588 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | def parallel_function(f): | ||
MinRK
|
r5171 | return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags) | ||
MinRK
|
r3588 | return parallel_function | ||
MinRK
|
r5821 | def getname(f): | ||
"""Get the name of an object. | ||||
For use in case of callables that are not functions, and | ||||
thus may not have __name__ defined. | ||||
Order: f.__name__ > f.name > str(f) | ||||
""" | ||||
try: | ||||
return f.__name__ | ||||
except: | ||||
pass | ||||
try: | ||||
return f.name | ||||
except: | ||||
pass | ||||
return str(f) | ||||
MinRK
|
r10073 | @decorator | ||
def sync_view_results(f, self, *args, **kwargs): | ||||
"""sync relevant results from self.client to our results attribute. | ||||
This is a clone of view.sync_results, but for remote functions | ||||
""" | ||||
view = self.view | ||||
if view._in_sync_results: | ||||
return f(self, *args, **kwargs) | ||||
print 'in sync results', f | ||||
view._in_sync_results = True | ||||
try: | ||||
ret = f(self, *args, **kwargs) | ||||
finally: | ||||
view._in_sync_results = False | ||||
view._sync_results() | ||||
return ret | ||||
MinRK
|
r3588 | #-------------------------------------------------------------------------- | ||
# Classes | ||||
#-------------------------------------------------------------------------- | ||||
class RemoteFunction(object): | ||||
"""Turn an existing function into a remote function. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | view : View instance | ||
The view to be used for execution | ||||
MinRK
|
r3588 | f : callable | ||
The function to be wrapped into a remote function | ||||
block : bool [default: None] | ||||
Whether to wait for results or not. The default behavior is | ||||
MinRK
|
r3664 | to use the current `block` attribute of `view` | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | **flags : remaining kwargs are passed to View.temp_flags | ||
MinRK
|
r3588 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | view = None # the remote connection | ||
MinRK
|
r3588 | func = None # the wrapped function | ||
block = None # whether to block | ||||
MinRK
|
r3664 | flags = None # dict of extra kwargs for temp_flags | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def __init__(self, view, f, block=None, **flags): | ||
self.view = view | ||||
MinRK
|
r3588 | self.func = f | ||
self.block=block | ||||
MinRK
|
r3664 | self.flags=flags | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | def __call__(self, *args, **kwargs): | ||
MinRK
|
r3664 | block = self.view.block if self.block is None else self.block | ||
with self.view.temp_flags(block=block, **self.flags): | ||||
return self.view.apply(self.func, *args, **kwargs) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | |||
class ParallelFunction(RemoteFunction): | ||||
MinRK
|
r3644 | """Class for mapping a function to sequences. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3644 | This will distribute the sequences according the a mapper, and call | ||
the function on each sub-sequence. If called via map, then the function | ||||
will be called once on each element, rather that each sub-sequence. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3644 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | view : View instance | ||
The view to be used for execution | ||||
MinRK
|
r3644 | f : callable | ||
The function to be wrapped into a remote function | ||||
MinRK
|
r3664 | dist : str [default: 'b'] | ||
The key for which mapObject to use to distribute sequences | ||||
options are: | ||||
* 'b' : use contiguous chunks in order | ||||
* 'r' : use round-robin striping | ||||
MinRK
|
r3644 | block : bool [default: None] | ||
Whether to wait for results or not. The default behavior is | ||||
MinRK
|
r3664 | to use the current `block` attribute of `view` | ||
chunksize : int or None | ||||
MinRK
|
r3644 | The size of chunk to use when breaking up sequences in a load-balanced manner | ||
MinRK
|
r5171 | ordered : bool [default: True] | ||
MinRK
|
r10568 | Whether the result should be kept in order. If False, | ||
results become available as they arrive, regardless of submission order. | ||||
MinRK
|
r3664 | **flags : remaining kwargs are passed to View.temp_flags | ||
MinRK
|
r3644 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r10568 | chunksize = None | ||
ordered = None | ||||
mapObject = None | ||||
MinRK
|
r10566 | _mapping = False | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5171 | def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): | ||
MinRK
|
r3664 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) | ||
self.chunksize = chunksize | ||||
MinRK
|
r5171 | self.ordered = ordered | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | mapClass = Map.dists[dist] | ||
self.mapObject = mapClass() | ||||
MinRK
|
r10566 | |||
MinRK
|
r10073 | @sync_view_results | ||
MinRK
|
r3588 | def __call__(self, *sequences): | ||
MinRK
|
r5290 | client = self.view.client | ||
MinRK
|
r10566 | lens = [] | ||
MinRK
|
r10568 | maxlen = minlen = -1 | ||
MinRK
|
r10566 | for i, seq in enumerate(sequences): | ||
try: | ||||
n = len(seq) | ||||
except Exception: | ||||
seq = list(seq) | ||||
MinRK
|
r10570 | if isinstance(sequences, tuple): | ||
# can't alter a tuple | ||||
sequences = list(sequences) | ||||
MinRK
|
r10566 | sequences[i] = seq | ||
n = len(seq) | ||||
MinRK
|
r10568 | if n > maxlen: | ||
maxlen = n | ||||
if minlen == -1 or n < minlen: | ||||
minlen = n | ||||
MinRK
|
r10566 | lens.append(n) | ||
MinRK
|
r3664 | # check that the length of sequences match | ||
MinRK
|
r10568 | if not self._mapping and minlen != maxlen: | ||
MinRK
|
r10566 | msg = 'all sequences must have equal length, but have %s' % lens | ||
raise ValueError(msg) | ||||
MinRK
|
r3664 | balanced = 'Balanced' in self.view.__class__.__name__ | ||
if balanced: | ||||
if self.chunksize: | ||||
MinRK
|
r10568 | nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0) | ||
MinRK
|
r3636 | else: | ||
MinRK
|
r10568 | nparts = maxlen | ||
MinRK
|
r3664 | targets = [None]*nparts | ||
MinRK
|
r3588 | else: | ||
MinRK
|
r3664 | if self.chunksize: | ||
warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) | ||||
MinRK
|
r3588 | # multiplexed: | ||
MinRK
|
r3664 | targets = self.view.targets | ||
MinRK
|
r5290 | # 'all' is lazily evaluated at execution time, which is now: | ||
if targets == 'all': | ||||
targets = client._build_targets(targets)[1] | ||||
MinRK
|
r6513 | elif isinstance(targets, int): | ||
# single-engine view, targets must be iterable | ||||
targets = [targets] | ||||
MinRK
|
r3636 | nparts = len(targets) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | msg_ids = [] | ||
MinRK
|
r3635 | for index, t in enumerate(targets): | ||
MinRK
|
r3588 | args = [] | ||
for seq in sequences: | ||||
MinRK
|
r10568 | part = self.mapObject.getPartition(seq, index, nparts, maxlen) | ||
args.append(part) | ||||
if not any(args): | ||||
MinRK
|
r3588 | continue | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r10566 | if self._mapping: | ||
MinRK
|
r4155 | if sys.version_info[0] >= 3: | ||
f = lambda f, *sequences: list(map(f, *sequences)) | ||||
else: | ||||
f = map | ||||
MinRK
|
r10568 | args = [self.func] + args | ||
MinRK
|
r3588 | else: | ||
f=self.func | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | view = self.view if balanced else client[t] | ||
with view.temp_flags(block=False, **self.flags): | ||||
ar = view.apply(f, *args) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r10568 | msg_ids.extend(ar.msg_ids) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r10568 | r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, | ||
MinRK
|
r5821 | fname=getname(self.func), | ||
MinRK
|
r5171 | ordered=self.ordered | ||
) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | if self.block: | ||
MinRK
|
r3635 | try: | ||
return r.get() | ||||
except KeyboardInterrupt: | ||||
return r | ||||
MinRK
|
r3588 | else: | ||
return r | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3588 | def map(self, *sequences): | ||
MinRK
|
r10604 | """call a function on each element of one or more sequence(s) remotely. | ||
MinRK
|
r3644 | This should behave very much like the builtin map, but return an AsyncMapResult | ||
if self.block is False. | ||||
MinRK
|
r10604 | |||
That means it can take generators (will be cast to lists locally), | ||||
and mismatched sequence lengths will be padded with None. | ||||
MinRK
|
r3644 | """ | ||
MinRK
|
r10566 | # set _mapping as a flag for use inside self.__call__ | ||
self._mapping = True | ||||
MinRK
|
r3635 | try: | ||
MinRK
|
r10566 | ret = self(*sequences) | ||
MinRK
|
r3635 | finally: | ||
MinRK
|
r10566 | self._mapping = False | ||
MinRK
|
r3588 | return ret | ||
Thomas Kluyver
|
r3886 | __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] | ||