##// END OF EJS Templates
update payload source...
update payload source should be simple key, not Python (or worse, IPython) specific long key. I could have sworn I did this long ago, but apparently the source key was only updated in display_data, not execute_reply.

File last commit:

r10604:d5374c80
r11839:8cbae575
Show More
remotefunction.py
282 lines | 8.8 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """Remote Functions and decorators for Views.
Authors:
* Brian Granger
* Min RK
"""
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 #-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2010-2011 The IPython Development Team
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
update parallel code for py3k...
r4155 from __future__ import division
MinRK
fix sys import in remotefunction
r4159 import sys
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 import warnings
MinRK
notice nesting of `sync_results` decorator...
r10073 from IPython.external.decorator import decorator
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 from IPython.testing.skipdoctest import skip_doctest
MinRK
testing fixes
r3641
MinRK
eliminate relative imports
r3642 from . import map as Map
from .asyncresult import AsyncMapResult
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
#-----------------------------------------------------------------------------
MinRK
allow Reference as callable in map/apply...
r5821 # Functions and Decorators
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 #-----------------------------------------------------------------------------
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 @skip_doctest
MinRK
update API after sagedays29...
r3664 def remote(view, block=None, **flags):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """Turn a function into a remote function.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 This method can be used for map:
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 In [1]: @remote(view,block=True)
MinRK
testing fixes
r3641 ...: def func(a):
...: pass
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def remote_function(f):
MinRK
update API after sagedays29...
r3664 return RemoteFunction(view, f, block=block, **flags)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return remote_function
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 @skip_doctest
MinRK
add unordered iteration to AsyncMapResults...
r5171 def parallel(view, dist='b', block=None, ordered=True, **flags):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """Turn a function into a parallel remote function.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 This method can be used for map:
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 In [1]: @parallel(view, block=True)
MinRK
testing fixes
r3641 ...: def func(a):
...: pass
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def parallel_function(f):
MinRK
add unordered iteration to AsyncMapResults...
r5171 return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return parallel_function
MinRK
allow Reference as callable in map/apply...
r5821 def getname(f):
"""Get the name of an object.
For use in case of callables that are not functions, and
thus may not have __name__ defined.
Order: f.__name__ > f.name > str(f)
"""
try:
return f.__name__
except:
pass
try:
return f.name
except:
pass
return str(f)
MinRK
notice nesting of `sync_results` decorator...
r10073 @decorator
def sync_view_results(f, self, *args, **kwargs):
"""sync relevant results from self.client to our results attribute.
This is a clone of view.sync_results, but for remote functions
"""
view = self.view
if view._in_sync_results:
return f(self, *args, **kwargs)
print 'in sync results', f
view._in_sync_results = True
try:
ret = f(self, *args, **kwargs)
finally:
view._in_sync_results = False
view._sync_results()
return ret
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 #--------------------------------------------------------------------------
# Classes
#--------------------------------------------------------------------------
class RemoteFunction(object):
"""Turn an existing function into a remote function.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 view : View instance
The view to be used for execution
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 f : callable
The function to be wrapped into a remote function
block : bool [default: None]
Whether to wait for results or not. The default behavior is
MinRK
update API after sagedays29...
r3664 to use the current `block` attribute of `view`
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 **flags : remaining kwargs are passed to View.temp_flags
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 view = None # the remote connection
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 func = None # the wrapped function
block = None # whether to block
MinRK
update API after sagedays29...
r3664 flags = None # dict of extra kwargs for temp_flags
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def __init__(self, view, f, block=None, **flags):
self.view = view
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 self.func = f
self.block=block
MinRK
update API after sagedays29...
r3664 self.flags=flags
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def __call__(self, *args, **kwargs):
MinRK
update API after sagedays29...
r3664 block = self.view.block if self.block is None else self.block
with self.view.temp_flags(block=block, **self.flags):
return self.view.apply(self.func, *args, **kwargs)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
class ParallelFunction(RemoteFunction):
MinRK
cleanup pass
r3644 """Class for mapping a function to sequences.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup pass
r3644 This will distribute the sequences according the a mapper, and call
the function on each sub-sequence. If called via map, then the function
will be called once on each element, rather that each sub-sequence.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup pass
r3644 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 view : View instance
The view to be used for execution
MinRK
cleanup pass
r3644 f : callable
The function to be wrapped into a remote function
MinRK
update API after sagedays29...
r3664 dist : str [default: 'b']
The key for which mapObject to use to distribute sequences
options are:
* 'b' : use contiguous chunks in order
* 'r' : use round-robin striping
MinRK
cleanup pass
r3644 block : bool [default: None]
Whether to wait for results or not. The default behavior is
MinRK
update API after sagedays29...
r3664 to use the current `block` attribute of `view`
chunksize : int or None
MinRK
cleanup pass
r3644 The size of chunk to use when breaking up sequences in a load-balanced manner
MinRK
add unordered iteration to AsyncMapResults...
r5171 ordered : bool [default: True]
MinRK
allow view.map to accept unequal sequence sizes...
r10568 Whether the result should be kept in order. If False,
results become available as they arrive, regardless of submission order.
MinRK
update API after sagedays29...
r3664 **flags : remaining kwargs are passed to View.temp_flags
MinRK
cleanup pass
r3644 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow view.map to accept unequal sequence sizes...
r10568 chunksize = None
ordered = None
mapObject = None
MinRK
cast sequences without len to lists in map...
r10566 _mapping = False
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add unordered iteration to AsyncMapResults...
r5171 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
MinRK
update API after sagedays29...
r3664 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
self.chunksize = chunksize
MinRK
add unordered iteration to AsyncMapResults...
r5171 self.ordered = ordered
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 mapClass = Map.dists[dist]
self.mapObject = mapClass()
MinRK
cast sequences without len to lists in map...
r10566
MinRK
notice nesting of `sync_results` decorator...
r10073 @sync_view_results
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def __call__(self, *sequences):
MinRK
handle targets='all' in remotefunction...
r5290 client = self.view.client
MinRK
cast sequences without len to lists in map...
r10566 lens = []
MinRK
allow view.map to accept unequal sequence sizes...
r10568 maxlen = minlen = -1
MinRK
cast sequences without len to lists in map...
r10566 for i, seq in enumerate(sequences):
try:
n = len(seq)
except Exception:
seq = list(seq)
MinRK
cast sequences to list if we need to alter it
r10570 if isinstance(sequences, tuple):
# can't alter a tuple
sequences = list(sequences)
MinRK
cast sequences without len to lists in map...
r10566 sequences[i] = seq
n = len(seq)
MinRK
allow view.map to accept unequal sequence sizes...
r10568 if n > maxlen:
maxlen = n
if minlen == -1 or n < minlen:
minlen = n
MinRK
cast sequences without len to lists in map...
r10566 lens.append(n)
MinRK
update API after sagedays29...
r3664 # check that the length of sequences match
MinRK
allow view.map to accept unequal sequence sizes...
r10568 if not self._mapping and minlen != maxlen:
MinRK
cast sequences without len to lists in map...
r10566 msg = 'all sequences must have equal length, but have %s' % lens
raise ValueError(msg)
MinRK
update API after sagedays29...
r3664 balanced = 'Balanced' in self.view.__class__.__name__
if balanced:
if self.chunksize:
MinRK
allow view.map to accept unequal sequence sizes...
r10568 nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0)
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 else:
MinRK
allow view.map to accept unequal sequence sizes...
r10568 nparts = maxlen
MinRK
update API after sagedays29...
r3664 targets = [None]*nparts
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 else:
MinRK
update API after sagedays29...
r3664 if self.chunksize:
warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 # multiplexed:
MinRK
update API after sagedays29...
r3664 targets = self.view.targets
MinRK
handle targets='all' in remotefunction...
r5290 # 'all' is lazily evaluated at execution time, which is now:
if targets == 'all':
targets = client._build_targets(targets)[1]
MinRK
allow map / parallel function for single-engine views
r6513 elif isinstance(targets, int):
# single-engine view, targets must be iterable
targets = [targets]
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 nparts = len(targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 msg_ids = []
MinRK
API update involving map and load-balancing
r3635 for index, t in enumerate(targets):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 args = []
for seq in sequences:
MinRK
allow view.map to accept unequal sequence sizes...
r10568 part = self.mapObject.getPartition(seq, index, nparts, maxlen)
args.append(part)
if not any(args):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 continue
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cast sequences without len to lists in map...
r10566 if self._mapping:
MinRK
update parallel code for py3k...
r4155 if sys.version_info[0] >= 3:
f = lambda f, *sequences: list(map(f, *sequences))
else:
f = map
MinRK
allow view.map to accept unequal sequence sizes...
r10568 args = [self.func] + args
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 else:
f=self.func
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 view = self.view if balanced else client[t]
with view.temp_flags(block=False, **self.flags):
ar = view.apply(f, *args)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow view.map to accept unequal sequence sizes...
r10568 msg_ids.extend(ar.msg_ids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow view.map to accept unequal sequence sizes...
r10568 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
MinRK
allow Reference as callable in map/apply...
r5821 fname=getname(self.func),
MinRK
add unordered iteration to AsyncMapResults...
r5171 ordered=self.ordered
)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 if self.block:
MinRK
API update involving map and load-balancing
r3635 try:
return r.get()
except KeyboardInterrupt:
return r
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 else:
return r
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def map(self, *sequences):
MinRK
add note to map docstring
r10604 """call a function on each element of one or more sequence(s) remotely.
MinRK
cleanup pass
r3644 This should behave very much like the builtin map, but return an AsyncMapResult
if self.block is False.
MinRK
add note to map docstring
r10604
That means it can take generators (will be cast to lists locally),
and mismatched sequence lengths will be padded with None.
MinRK
cleanup pass
r3644 """
MinRK
cast sequences without len to lists in map...
r10566 # set _mapping as a flag for use inside self.__call__
self._mapping = True
MinRK
API update involving map and load-balancing
r3635 try:
MinRK
cast sequences without len to lists in map...
r10566 ret = self(*sequences)
MinRK
API update involving map and load-balancing
r3635 finally:
MinRK
cast sequences without len to lists in map...
r10566 self._mapping = False
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return ret
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']