##// END OF EJS Templates
little bug fixed in pyout message print.
little bug fixed in pyout message print.

File last commit:

r5290:55d81f86
r5587:fdbcb3d8
Show More
remotefunction.py
222 lines | 7.0 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """Remote Functions and decorators for Views.
Authors:
* Brian Granger
* Min RK
"""
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 #-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2010-2011 The IPython Development Team
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
update parallel code for py3k...
r4155 from __future__ import division
MinRK
fix sys import in remotefunction
r4159 import sys
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 import warnings
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 from IPython.testing.skipdoctest import skip_doctest
MinRK
testing fixes
r3641
MinRK
eliminate relative imports
r3642 from . import map as Map
from .asyncresult import AsyncMapResult
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
#-----------------------------------------------------------------------------
# Decorators
#-----------------------------------------------------------------------------
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 @skip_doctest
MinRK
update API after sagedays29...
r3664 def remote(view, block=None, **flags):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """Turn a function into a remote function.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 This method can be used for map:
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 In [1]: @remote(view,block=True)
MinRK
testing fixes
r3641 ...: def func(a):
...: pass
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def remote_function(f):
MinRK
update API after sagedays29...
r3664 return RemoteFunction(view, f, block=block, **flags)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return remote_function
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 @skip_doctest
MinRK
add unordered iteration to AsyncMapResults...
r5171 def parallel(view, dist='b', block=None, ordered=True, **flags):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """Turn a function into a parallel remote function.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 This method can be used for map:
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 In [1]: @parallel(view, block=True)
MinRK
testing fixes
r3641 ...: def func(a):
...: pass
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def parallel_function(f):
MinRK
add unordered iteration to AsyncMapResults...
r5171 return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return parallel_function
#--------------------------------------------------------------------------
# Classes
#--------------------------------------------------------------------------
class RemoteFunction(object):
"""Turn an existing function into a remote function.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 view : View instance
The view to be used for execution
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 f : callable
The function to be wrapped into a remote function
block : bool [default: None]
Whether to wait for results or not. The default behavior is
MinRK
update API after sagedays29...
r3664 to use the current `block` attribute of `view`
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 **flags : remaining kwargs are passed to View.temp_flags
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 view = None # the remote connection
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 func = None # the wrapped function
block = None # whether to block
MinRK
update API after sagedays29...
r3664 flags = None # dict of extra kwargs for temp_flags
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def __init__(self, view, f, block=None, **flags):
self.view = view
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 self.func = f
self.block=block
MinRK
update API after sagedays29...
r3664 self.flags=flags
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def __call__(self, *args, **kwargs):
MinRK
update API after sagedays29...
r3664 block = self.view.block if self.block is None else self.block
with self.view.temp_flags(block=block, **self.flags):
return self.view.apply(self.func, *args, **kwargs)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
class ParallelFunction(RemoteFunction):
MinRK
cleanup pass
r3644 """Class for mapping a function to sequences.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup pass
r3644 This will distribute the sequences according the a mapper, and call
the function on each sub-sequence. If called via map, then the function
will be called once on each element, rather that each sub-sequence.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup pass
r3644 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 view : View instance
The view to be used for execution
MinRK
cleanup pass
r3644 f : callable
The function to be wrapped into a remote function
MinRK
update API after sagedays29...
r3664 dist : str [default: 'b']
The key for which mapObject to use to distribute sequences
options are:
* 'b' : use contiguous chunks in order
* 'r' : use round-robin striping
MinRK
cleanup pass
r3644 block : bool [default: None]
Whether to wait for results or not. The default behavior is
MinRK
update API after sagedays29...
r3664 to use the current `block` attribute of `view`
chunksize : int or None
MinRK
cleanup pass
r3644 The size of chunk to use when breaking up sequences in a load-balanced manner
MinRK
add unordered iteration to AsyncMapResults...
r5171 ordered : bool [default: True]
Whether
MinRK
update API after sagedays29...
r3664 **flags : remaining kwargs are passed to View.temp_flags
MinRK
cleanup pass
r3644 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 chunksize=None
MinRK
add unordered iteration to AsyncMapResults...
r5171 ordered=None
MinRK
update API after sagedays29...
r3664 mapObject=None
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add unordered iteration to AsyncMapResults...
r5171 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
MinRK
update API after sagedays29...
r3664 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
self.chunksize = chunksize
MinRK
add unordered iteration to AsyncMapResults...
r5171 self.ordered = ordered
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 mapClass = Map.dists[dist]
self.mapObject = mapClass()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def __call__(self, *sequences):
MinRK
handle targets='all' in remotefunction...
r5290 client = self.view.client
MinRK
update API after sagedays29...
r3664 # check that the length of sequences match
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 len_0 = len(sequences[0])
for s in sequences:
if len(s)!=len_0:
MinRK
API update involving map and load-balancing
r3635 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
raise ValueError(msg)
MinRK
update API after sagedays29...
r3664 balanced = 'Balanced' in self.view.__class__.__name__
if balanced:
if self.chunksize:
MinRK
update parallel code for py3k...
r4155 nparts = len_0//self.chunksize + int(len_0%self.chunksize > 0)
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 else:
nparts = len_0
MinRK
update API after sagedays29...
r3664 targets = [None]*nparts
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 else:
MinRK
update API after sagedays29...
r3664 if self.chunksize:
warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 # multiplexed:
MinRK
update API after sagedays29...
r3664 targets = self.view.targets
MinRK
handle targets='all' in remotefunction...
r5290 # 'all' is lazily evaluated at execution time, which is now:
if targets == 'all':
targets = client._build_targets(targets)[1]
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 nparts = len(targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 msg_ids = []
MinRK
API update involving map and load-balancing
r3635 for index, t in enumerate(targets):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 args = []
for seq in sequences:
part = self.mapObject.getPartition(seq, index, nparts)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if len(part) == 0:
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 continue
else:
args.append(part)
if not args:
continue
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 # print (args)
if hasattr(self, '_map'):
MinRK
update parallel code for py3k...
r4155 if sys.version_info[0] >= 3:
f = lambda f, *sequences: list(map(f, *sequences))
else:
f = map
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 args = [self.func]+args
else:
f=self.func
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 view = self.view if balanced else client[t]
with view.temp_flags(block=False, **self.flags):
ar = view.apply(f, *args)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 msg_ids.append(ar.msg_ids[0])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add unordered iteration to AsyncMapResults...
r5171 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
fname=self.func.__name__,
ordered=self.ordered
)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 if self.block:
MinRK
API update involving map and load-balancing
r3635 try:
return r.get()
except KeyboardInterrupt:
return r
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 else:
return r
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def map(self, *sequences):
Bernardo B. Marques
remove all trailling spaces
r4872 """call a function on each element of a sequence remotely.
MinRK
cleanup pass
r3644 This should behave very much like the builtin map, but return an AsyncMapResult
if self.block is False.
"""
# set _map as a flag for use inside self.__call__
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 self._map = True
MinRK
API update involving map and load-balancing
r3635 try:
ret = self.__call__(*sequences)
finally:
del self._map
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return ret
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']