##// END OF EJS Templates
remove debug statements from Scheduler...
remove debug statements from Scheduler These can be expensive under high load, and since the Scheduler is no longer changing, they should be removed prior to 0.11 release.

File last commit:

r4018:9950e71b
r4140:4000009d
Show More
remotefunction.py
206 lines | 6.6 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """Remote Functions and decorators for Views.
Authors:
* Brian Granger
* Min RK
"""
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 #-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2010-2011 The IPython Development Team
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 import warnings
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 from IPython.testing.skipdoctest import skip_doctest
MinRK
testing fixes
r3641
MinRK
eliminate relative imports
r3642 from . import map as Map
from .asyncresult import AsyncMapResult
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
#-----------------------------------------------------------------------------
# Decorators
#-----------------------------------------------------------------------------
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 @skip_doctest
MinRK
update API after sagedays29...
r3664 def remote(view, block=None, **flags):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """Turn a function into a remote function.
This method can be used for map:
MinRK
update API after sagedays29...
r3664 In [1]: @remote(view,block=True)
MinRK
testing fixes
r3641 ...: def func(a):
...: pass
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
MinRK
testing fixes
r3641
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def remote_function(f):
MinRK
update API after sagedays29...
r3664 return RemoteFunction(view, f, block=block, **flags)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return remote_function
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 @skip_doctest
MinRK
update API after sagedays29...
r3664 def parallel(view, dist='b', block=None, **flags):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """Turn a function into a parallel remote function.
This method can be used for map:
MinRK
update API after sagedays29...
r3664 In [1]: @parallel(view, block=True)
MinRK
testing fixes
r3641 ...: def func(a):
...: pass
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
MinRK
testing fixes
r3641
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def parallel_function(f):
MinRK
update API after sagedays29...
r3664 return ParallelFunction(view, f, dist=dist, block=block, **flags)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return parallel_function
#--------------------------------------------------------------------------
# Classes
#--------------------------------------------------------------------------
class RemoteFunction(object):
"""Turn an existing function into a remote function.
Parameters
----------
MinRK
update API after sagedays29...
r3664 view : View instance
The view to be used for execution
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 f : callable
The function to be wrapped into a remote function
block : bool [default: None]
Whether to wait for results or not. The default behavior is
MinRK
update API after sagedays29...
r3664 to use the current `block` attribute of `view`
**flags : remaining kwargs are passed to View.temp_flags
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
MinRK
update API after sagedays29...
r3664 view = None # the remote connection
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 func = None # the wrapped function
block = None # whether to block
MinRK
update API after sagedays29...
r3664 flags = None # dict of extra kwargs for temp_flags
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
MinRK
update API after sagedays29...
r3664 def __init__(self, view, f, block=None, **flags):
self.view = view
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 self.func = f
self.block=block
MinRK
update API after sagedays29...
r3664 self.flags=flags
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
def __call__(self, *args, **kwargs):
MinRK
update API after sagedays29...
r3664 block = self.view.block if self.block is None else self.block
with self.view.temp_flags(block=block, **self.flags):
return self.view.apply(self.func, *args, **kwargs)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
class ParallelFunction(RemoteFunction):
MinRK
cleanup pass
r3644 """Class for mapping a function to sequences.
This will distribute the sequences according the a mapper, and call
the function on each sub-sequence. If called via map, then the function
will be called once on each element, rather that each sub-sequence.
Parameters
----------
MinRK
update API after sagedays29...
r3664 view : View instance
The view to be used for execution
MinRK
cleanup pass
r3644 f : callable
The function to be wrapped into a remote function
MinRK
update API after sagedays29...
r3664 dist : str [default: 'b']
The key for which mapObject to use to distribute sequences
options are:
* 'b' : use contiguous chunks in order
* 'r' : use round-robin striping
MinRK
cleanup pass
r3644 block : bool [default: None]
Whether to wait for results or not. The default behavior is
MinRK
update API after sagedays29...
r3664 to use the current `block` attribute of `view`
chunksize : int or None
MinRK
cleanup pass
r3644 The size of chunk to use when breaking up sequences in a load-balanced manner
MinRK
update API after sagedays29...
r3664 **flags : remaining kwargs are passed to View.temp_flags
MinRK
cleanup pass
r3644 """
MinRK
update API after sagedays29...
r3664
chunksize=None
mapObject=None
def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags):
super(ParallelFunction, self).__init__(view, f, block=block, **flags)
self.chunksize = chunksize
MinRK
Client -> HasTraits, update examples with API tweaks
r3636
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 mapClass = Map.dists[dist]
self.mapObject = mapClass()
def __call__(self, *sequences):
MinRK
update API after sagedays29...
r3664 # check that the length of sequences match
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 len_0 = len(sequences[0])
for s in sequences:
if len(s)!=len_0:
MinRK
API update involving map and load-balancing
r3635 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
raise ValueError(msg)
MinRK
update API after sagedays29...
r3664 balanced = 'Balanced' in self.view.__class__.__name__
if balanced:
if self.chunksize:
nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0)
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 else:
nparts = len_0
MinRK
update API after sagedays29...
r3664 targets = [None]*nparts
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 else:
MinRK
update API after sagedays29...
r3664 if self.chunksize:
warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 # multiplexed:
MinRK
update API after sagedays29...
r3664 targets = self.view.targets
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 nparts = len(targets)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
msg_ids = []
# my_f = lambda *a: map(self.func, *a)
MinRK
update API after sagedays29...
r3664 client = self.view.client
MinRK
API update involving map and load-balancing
r3635 for index, t in enumerate(targets):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 args = []
for seq in sequences:
part = self.mapObject.getPartition(seq, index, nparts)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if len(part) == 0:
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 continue
else:
args.append(part)
if not args:
continue
# print (args)
if hasattr(self, '_map'):
f = map
args = [self.func]+args
else:
f=self.func
MinRK
update API after sagedays29...
r3664
view = self.view if balanced else client[t]
with view.temp_flags(block=False, **self.flags):
ar = view.apply(f, *args)
MinRK
API update involving map and load-balancing
r3635
msg_ids.append(ar.msg_ids[0])
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
MinRK
update API after sagedays29...
r3664 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 if self.block:
MinRK
API update involving map and load-balancing
r3635 try:
return r.get()
except KeyboardInterrupt:
return r
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 else:
return r
def map(self, *sequences):
MinRK
cleanup pass
r3644 """call a function on each element of a sequence remotely.
This should behave very much like the builtin map, but return an AsyncMapResult
if self.block is False.
"""
# set _map as a flag for use inside self.__call__
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 self._map = True
MinRK
API update involving map and load-balancing
r3635 try:
ret = self.__call__(*sequences)
finally:
del self._map
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return ret
Thomas Kluyver
Move skip_doctest decorator to separate module, so that it can be used without triggering other imports.
r3886 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']