##// END OF EJS Templates
various db backend fixes...
various db backend fixes * use index on msg_id in mongodb backend (_table prevented some methods from working outside the session) * purge_request improved to use fewer db calls * mongodb testcase split into its own file * Fix equality testing, NULL handling, in SQLiteDB backend

File last commit:

r3673:b9f54806
r3875:ffe043d4
Show More
remotefunction.py
199 lines | 6.6 KiB | text/x-python | PythonLexer
MinRK
update API after sagedays29...
r3664 """Remote Functions and decorators for Views."""
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 #-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 import warnings
MinRK
testing fixes
r3641 from IPython.testing import decorators as testdec
MinRK
eliminate relative imports
r3642 from . import map as Map
from .asyncresult import AsyncMapResult
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
#-----------------------------------------------------------------------------
# Decorators
#-----------------------------------------------------------------------------
MinRK
testing fixes
r3641 @testdec.skip_doctest
MinRK
update API after sagedays29...
r3664 def remote(view, block=None, **flags):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """Turn a function into a remote function.
This method can be used for map:
MinRK
update API after sagedays29...
r3664 In [1]: @remote(view,block=True)
MinRK
testing fixes
r3641 ...: def func(a):
...: pass
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
MinRK
testing fixes
r3641
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def remote_function(f):
MinRK
update API after sagedays29...
r3664 return RemoteFunction(view, f, block=block, **flags)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return remote_function
MinRK
testing fixes
r3641 @testdec.skip_doctest
MinRK
update API after sagedays29...
r3664 def parallel(view, dist='b', block=None, **flags):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """Turn a function into a parallel remote function.
This method can be used for map:
MinRK
update API after sagedays29...
r3664 In [1]: @parallel(view, block=True)
MinRK
testing fixes
r3641 ...: def func(a):
...: pass
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
MinRK
testing fixes
r3641
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 def parallel_function(f):
MinRK
update API after sagedays29...
r3664 return ParallelFunction(view, f, dist=dist, block=block, **flags)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return parallel_function
#--------------------------------------------------------------------------
# Classes
#--------------------------------------------------------------------------
class RemoteFunction(object):
"""Turn an existing function into a remote function.
Parameters
----------
MinRK
update API after sagedays29...
r3664 view : View instance
The view to be used for execution
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 f : callable
The function to be wrapped into a remote function
block : bool [default: None]
Whether to wait for results or not. The default behavior is
MinRK
update API after sagedays29...
r3664 to use the current `block` attribute of `view`
**flags : remaining kwargs are passed to View.temp_flags
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 """
MinRK
update API after sagedays29...
r3664 view = None # the remote connection
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 func = None # the wrapped function
block = None # whether to block
MinRK
update API after sagedays29...
r3664 flags = None # dict of extra kwargs for temp_flags
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
MinRK
update API after sagedays29...
r3664 def __init__(self, view, f, block=None, **flags):
self.view = view
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 self.func = f
self.block=block
MinRK
update API after sagedays29...
r3664 self.flags=flags
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
def __call__(self, *args, **kwargs):
MinRK
update API after sagedays29...
r3664 block = self.view.block if self.block is None else self.block
with self.view.temp_flags(block=block, **self.flags):
return self.view.apply(self.func, *args, **kwargs)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
class ParallelFunction(RemoteFunction):
MinRK
cleanup pass
r3644 """Class for mapping a function to sequences.
This will distribute the sequences according the a mapper, and call
the function on each sub-sequence. If called via map, then the function
will be called once on each element, rather that each sub-sequence.
Parameters
----------
MinRK
update API after sagedays29...
r3664 view : View instance
The view to be used for execution
MinRK
cleanup pass
r3644 f : callable
The function to be wrapped into a remote function
MinRK
update API after sagedays29...
r3664 dist : str [default: 'b']
The key for which mapObject to use to distribute sequences
options are:
* 'b' : use contiguous chunks in order
* 'r' : use round-robin striping
MinRK
cleanup pass
r3644 block : bool [default: None]
Whether to wait for results or not. The default behavior is
MinRK
update API after sagedays29...
r3664 to use the current `block` attribute of `view`
chunksize : int or None
MinRK
cleanup pass
r3644 The size of chunk to use when breaking up sequences in a load-balanced manner
MinRK
update API after sagedays29...
r3664 **flags : remaining kwargs are passed to View.temp_flags
MinRK
cleanup pass
r3644 """
MinRK
update API after sagedays29...
r3664
chunksize=None
mapObject=None
def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags):
super(ParallelFunction, self).__init__(view, f, block=block, **flags)
self.chunksize = chunksize
MinRK
Client -> HasTraits, update examples with API tweaks
r3636
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 mapClass = Map.dists[dist]
self.mapObject = mapClass()
def __call__(self, *sequences):
MinRK
update API after sagedays29...
r3664 # check that the length of sequences match
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 len_0 = len(sequences[0])
for s in sequences:
if len(s)!=len_0:
MinRK
API update involving map and load-balancing
r3635 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
raise ValueError(msg)
MinRK
update API after sagedays29...
r3664 balanced = 'Balanced' in self.view.__class__.__name__
if balanced:
if self.chunksize:
nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0)
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 else:
nparts = len_0
MinRK
update API after sagedays29...
r3664 targets = [None]*nparts
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 else:
MinRK
update API after sagedays29...
r3664 if self.chunksize:
warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 # multiplexed:
MinRK
update API after sagedays29...
r3664 targets = self.view.targets
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 nparts = len(targets)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
msg_ids = []
# my_f = lambda *a: map(self.func, *a)
MinRK
update API after sagedays29...
r3664 client = self.view.client
MinRK
API update involving map and load-balancing
r3635 for index, t in enumerate(targets):
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 args = []
for seq in sequences:
part = self.mapObject.getPartition(seq, index, nparts)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if len(part) == 0:
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 continue
else:
args.append(part)
if not args:
continue
# print (args)
if hasattr(self, '_map'):
f = map
args = [self.func]+args
else:
f=self.func
MinRK
update API after sagedays29...
r3664
view = self.view if balanced else client[t]
with view.temp_flags(block=False, **self.flags):
ar = view.apply(f, *args)
MinRK
API update involving map and load-balancing
r3635
msg_ids.append(ar.msg_ids[0])
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588
MinRK
update API after sagedays29...
r3664 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__)
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 if self.block:
MinRK
API update involving map and load-balancing
r3635 try:
return r.get()
except KeyboardInterrupt:
return r
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 else:
return r
def map(self, *sequences):
MinRK
cleanup pass
r3644 """call a function on each element of a sequence remotely.
This should behave very much like the builtin map, but return an AsyncMapResult
if self.block is False.
"""
# set _map as a flag for use inside self.__call__
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 self._map = True
MinRK
API update involving map and load-balancing
r3635 try:
ret = self.__call__(*sequences)
finally:
del self._map
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 return ret
MinRK
cleanup pass
r3644 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']