##// END OF EJS Templates
Minor debug statement removed.
Minor debug statement removed.

File last commit:

r2526:ec6b47e5
r2869:8be76dcf
Show More
multienginefc.py
760 lines | 30.2 KiB | text/x-python | PythonLexer
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 # encoding: utf-8
"""
Expose the multiengine controller over the Foolscap network protocol.
"""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import cPickle as pickle
from types import FunctionType
from zope.interface import Interface, implements
from twisted.internet import defer
Brian Granger
Work to address the review comments on Fernando's branch....
r2498 from twisted.python import components, failure
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Fixed foolscap imports....
r2526 try:
from foolscap.api import Referenceable
except ImportError:
from foolscap import Referenceable
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
from IPython.kernel import error
from IPython.kernel import map as Map
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 from IPython.kernel.parallelfunction import ParallelFunction
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 from IPython.kernel.mapper import (
MultiEngineMapper,
IMultiEngineMapperFactory,
IMapper
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 from IPython.kernel.twistedutil import gatherBoth
Brian Granger
Work to address the review comments on Fernando's branch....
r2498 from IPython.kernel.multiengine import (
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 IMultiEngine,
IFullSynchronousMultiEngine,
ISynchronousMultiEngine)
from IPython.kernel.pendingdeferred import PendingDeferredManager
Brian Granger
Work to address the review comments on Fernando's branch....
r2498 from IPython.kernel.pickleutil import (
canDict,
canSequence, uncanDict, uncanSequence
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
from IPython.kernel.clientinterfaces import (
IFCClientInterfaceProvider,
IBlockingClientAdaptor
)
# Needed to access the true globals from __main__.__dict__
import __main__
#-------------------------------------------------------------------------------
# The Controller side of things
#-------------------------------------------------------------------------------
def packageResult(wrappedMethod):
def wrappedPackageResult(self, *args, **kwargs):
d = wrappedMethod(self, *args, **kwargs)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
return wrappedPackageResult
class IFCSynchronousMultiEngine(Interface):
"""Foolscap interface to `ISynchronousMultiEngine`.
The methods in this interface are similar to those of
`ISynchronousMultiEngine`, but their arguments and return values are pickled
if they are not already simple Python types that can be send over XML-RPC.
See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
documentation about the methods.
Most methods in this interface act like the `ISynchronousMultiEngine`
versions and can be called in blocking or non-blocking mode.
"""
pass
class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
"""Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
"""
implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
addSlash = True
def __init__(self, multiengine):
# Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
# it. This allow this class to do two adaptation steps.
self.smultiengine = ISynchronousMultiEngine(multiengine)
self._deferredIDCallbacks = {}
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def packageFailure(self, f):
f.cleanFailure()
return self.packageSuccess(f)
def packageSuccess(self, obj):
serial = pickle.dumps(obj, 2)
return serial
#---------------------------------------------------------------------------
# Things related to PendingDeferredManager
#---------------------------------------------------------------------------
@packageResult
def remote_get_pending_deferred(self, deferredID, block):
d = self.smultiengine.get_pending_deferred(deferredID, block)
try:
callback = self._deferredIDCallbacks.pop(deferredID)
except KeyError:
callback = None
if callback is not None:
d.addCallback(callback[0], *callback[1], **callback[2])
return d
@packageResult
def remote_clear_pending_deferreds(self):
return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
self._deferredIDCallbacks[did] = (callback, args, kwargs)
return did
Brian Granger
Messing with newlines.
r1829
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 #---------------------------------------------------------------------------
# IEngineMultiplexer related methods
#---------------------------------------------------------------------------
@packageResult
def remote_execute(self, lines, targets, block):
return self.smultiengine.execute(lines, targets=targets, block=block)
@packageResult
def remote_push(self, binaryNS, targets, block):
try:
namespace = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
d = self.smultiengine.push(namespace, targets=targets, block=block)
return d
@packageResult
def remote_pull(self, keys, targets, block):
d = self.smultiengine.pull(keys, targets=targets, block=block)
return d
@packageResult
def remote_push_function(self, binaryNS, targets, block):
try:
namespace = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
namespace = uncanDict(namespace)
d = self.smultiengine.push_function(namespace, targets=targets, block=block)
return d
def _canMultipleKeys(self, result):
return [canSequence(r) for r in result]
@packageResult
def remote_pull_function(self, keys, targets, block):
def can_functions(r, keys):
if len(keys)==1 or isinstance(keys, str):
result = canSequence(r)
elif len(keys)>1:
result = [canSequence(s) for s in r]
return result
d = self.smultiengine.pull_function(keys, targets=targets, block=block)
if block:
d.addCallback(can_functions, keys)
else:
d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
return d
@packageResult
def remote_push_serialized(self, binaryNS, targets, block):
try:
namespace = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
return d
@packageResult
def remote_pull_serialized(self, keys, targets, block):
d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
return d
@packageResult
def remote_get_result(self, i, targets, block):
if i == 'None':
i = None
return self.smultiengine.get_result(i, targets=targets, block=block)
@packageResult
def remote_reset(self, targets, block):
return self.smultiengine.reset(targets=targets, block=block)
@packageResult
def remote_keys(self, targets, block):
return self.smultiengine.keys(targets=targets, block=block)
@packageResult
def remote_kill(self, controller, targets, block):
return self.smultiengine.kill(controller, targets=targets, block=block)
@packageResult
def remote_clear_queue(self, targets, block):
return self.smultiengine.clear_queue(targets=targets, block=block)
@packageResult
def remote_queue_status(self, targets, block):
return self.smultiengine.queue_status(targets=targets, block=block)
@packageResult
def remote_set_properties(self, binaryNS, targets, block):
try:
ns = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
d = self.smultiengine.set_properties(ns, targets=targets, block=block)
return d
@packageResult
def remote_get_properties(self, keys, targets, block):
if keys=='None':
keys=None
return self.smultiengine.get_properties(keys, targets=targets, block=block)
@packageResult
def remote_has_properties(self, keys, targets, block):
return self.smultiengine.has_properties(keys, targets=targets, block=block)
@packageResult
def remote_del_properties(self, keys, targets, block):
return self.smultiengine.del_properties(keys, targets=targets, block=block)
@packageResult
def remote_clear_properties(self, targets, block):
return self.smultiengine.clear_properties(targets=targets, block=block)
#---------------------------------------------------------------------------
# IMultiEngine related methods
#---------------------------------------------------------------------------
def remote_get_ids(self):
"""Get the ids of the registered engines.
This method always blocks.
"""
return self.smultiengine.get_ids()
#---------------------------------------------------------------------------
# IFCClientInterfaceProvider related methods
#---------------------------------------------------------------------------
def remote_get_client_name(self):
return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
# The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
# `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
# two phase adaptation.
components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
IMultiEngine, IFCSynchronousMultiEngine)
#-------------------------------------------------------------------------------
# The Client side of things
#-------------------------------------------------------------------------------
class FCFullSynchronousMultiEngineClient(object):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 implements(
IFullSynchronousMultiEngine,
IBlockingClientAdaptor,
IMultiEngineMapperFactory,
IMapper
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
def __init__(self, remote_reference):
self.remote_reference = remote_reference
self._deferredIDCallbacks = {}
# This class manages some pending deferreds through this instance. This
# is required for methods like gather/scatter as it enables us to
# create our own pending deferreds for composite operations.
self.pdm = PendingDeferredManager()
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def unpackage(self, r):
return pickle.loads(r)
#---------------------------------------------------------------------------
# Things related to PendingDeferredManager
#---------------------------------------------------------------------------
def get_pending_deferred(self, deferredID, block=True):
# Because we are managing some pending deferreds locally (through
# self.pdm) and some remotely (on the controller), we first try the
# local one and then the remote one.
if self.pdm.quick_has_id(deferredID):
d = self.pdm.get_pending_deferred(deferredID, block)
return d
else:
d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
d.addCallback(self.unpackage)
try:
callback = self._deferredIDCallbacks.pop(deferredID)
except KeyError:
callback = None
if callback is not None:
d.addCallback(callback[0], *callback[1], **callback[2])
return d
def clear_pending_deferreds(self):
# This clear both the local (self.pdm) and remote pending deferreds
self.pdm.clear_pending_deferreds()
d2 = self.remote_reference.callRemote('clear_pending_deferreds')
d2.addCallback(self.unpackage)
return d2
def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
self._deferredIDCallbacks[did] = (callback, args, kwargs)
return did
#---------------------------------------------------------------------------
# IEngineMultiplexer related methods
#---------------------------------------------------------------------------
Brian Granger
Messing with newlines.
r1829
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 def execute(self, lines, targets='all', block=True):
d = self.remote_reference.callRemote('execute', lines, targets, block)
d.addCallback(self.unpackage)
return d
def push(self, namespace, targets='all', block=True):
serial = pickle.dumps(namespace, 2)
d = self.remote_reference.callRemote('push', serial, targets, block)
d.addCallback(self.unpackage)
return d
def pull(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('pull', keys, targets, block)
d.addCallback(self.unpackage)
return d
def push_function(self, namespace, targets='all', block=True):
cannedNamespace = canDict(namespace)
serial = pickle.dumps(cannedNamespace, 2)
d = self.remote_reference.callRemote('push_function', serial, targets, block)
d.addCallback(self.unpackage)
return d
def pull_function(self, keys, targets='all', block=True):
def uncan_functions(r, keys):
if len(keys)==1 or isinstance(keys, str):
return uncanSequence(r)
elif len(keys)>1:
return [uncanSequence(s) for s in r]
d = self.remote_reference.callRemote('pull_function', keys, targets, block)
if block:
d.addCallback(self.unpackage)
d.addCallback(uncan_functions, keys)
else:
d.addCallback(self.unpackage)
d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
return d
def push_serialized(self, namespace, targets='all', block=True):
cannedNamespace = canDict(namespace)
serial = pickle.dumps(cannedNamespace, 2)
d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
d.addCallback(self.unpackage)
return d
def pull_serialized(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
d.addCallback(self.unpackage)
return d
def get_result(self, i=None, targets='all', block=True):
if i is None: # This is because None cannot be marshalled by xml-rpc
i = 'None'
d = self.remote_reference.callRemote('get_result', i, targets, block)
d.addCallback(self.unpackage)
return d
def reset(self, targets='all', block=True):
d = self.remote_reference.callRemote('reset', targets, block)
d.addCallback(self.unpackage)
return d
def keys(self, targets='all', block=True):
d = self.remote_reference.callRemote('keys', targets, block)
d.addCallback(self.unpackage)
return d
def kill(self, controller=False, targets='all', block=True):
d = self.remote_reference.callRemote('kill', controller, targets, block)
d.addCallback(self.unpackage)
return d
def clear_queue(self, targets='all', block=True):
d = self.remote_reference.callRemote('clear_queue', targets, block)
d.addCallback(self.unpackage)
return d
def queue_status(self, targets='all', block=True):
d = self.remote_reference.callRemote('queue_status', targets, block)
d.addCallback(self.unpackage)
return d
def set_properties(self, properties, targets='all', block=True):
serial = pickle.dumps(properties, 2)
d = self.remote_reference.callRemote('set_properties', serial, targets, block)
d.addCallback(self.unpackage)
return d
def get_properties(self, keys=None, targets='all', block=True):
if keys==None:
keys='None'
d = self.remote_reference.callRemote('get_properties', keys, targets, block)
d.addCallback(self.unpackage)
return d
def has_properties(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('has_properties', keys, targets, block)
d.addCallback(self.unpackage)
return d
def del_properties(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('del_properties', keys, targets, block)
d.addCallback(self.unpackage)
return d
def clear_properties(self, targets='all', block=True):
d = self.remote_reference.callRemote('clear_properties', targets, block)
d.addCallback(self.unpackage)
return d
#---------------------------------------------------------------------------
# IMultiEngine related methods
#---------------------------------------------------------------------------
def get_ids(self):
d = self.remote_reference.callRemote('get_ids')
return d
#---------------------------------------------------------------------------
# ISynchronousMultiEngineCoordinator related methods
#---------------------------------------------------------------------------
def _process_targets(self, targets):
def create_targets(ids):
if isinstance(targets, int):
engines = [targets]
elif targets=='all':
engines = ids
elif isinstance(targets, (list, tuple)):
engines = targets
for t in engines:
if not t in ids:
raise error.InvalidEngineID("engine with id %r does not exist"%t)
return engines
d = self.get_ids()
d.addCallback(create_targets)
return d
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
# Note: scatter and gather handle pending deferreds locally through self.pdm.
# This enables us to collect a bunch fo deferred ids and make a secondary
# deferred id that corresponds to the entire group. This logic is extremely
# difficult to get right though.
def do_scatter(engines):
nEngines = len(engines)
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 mapClass = Map.dists[dist]
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 mapObject = mapClass()
d_list = []
# Loop through and push to each engine in non-blocking mode.
# This returns a set of deferreds to deferred_ids
for index, engineid in enumerate(engines):
partition = mapObject.getPartition(seq, index, nEngines)
if flatten and len(partition) == 1:
d = self.push({key: partition[0]}, targets=engineid, block=False)
else:
d = self.push({key: partition}, targets=engineid, block=False)
d_list.append(d)
# Collect the deferred to deferred_ids
d = gatherBoth(d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
# Now d has a list of deferred_ids or Failures coming
d.addCallback(error.collect_exceptions, 'scatter')
def process_did_list(did_list):
"""Turn a list of deferred_ids into a final result or failure."""
new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
final_d = gatherBoth(new_d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
final_d.addCallback(error.collect_exceptions, 'scatter')
final_d.addCallback(lambda lop: [i[0] for i in lop])
return final_d
# Now, depending on block, we need to handle the list deferred_ids
# coming down the pipe diferently.
if block:
# If we are blocking register a callback that will transform the
# list of deferred_ids into the final result.
d.addCallback(process_did_list)
return d
else:
# Here we are going to use a _local_ PendingDeferredManager.
deferred_id = self.pdm.get_deferred_id()
# This is the deferred we will return to the user that will fire
# with the local deferred_id AFTER we have received the list of
# primary deferred_ids
d_to_return = defer.Deferred()
def do_it(did_list):
"""Produce a deferred to the final result, but first fire the
deferred we will return to the user that has the local
deferred id."""
d_to_return.callback(deferred_id)
return process_did_list(did_list)
d.addCallback(do_it)
# Now save the deferred to the final result
self.pdm.save_pending_deferred(d, deferred_id)
return d_to_return
d = self._process_targets(targets)
d.addCallback(do_scatter)
return d
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 def gather(self, key, dist='b', targets='all', block=True):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
# Note: scatter and gather handle pending deferreds locally through self.pdm.
# This enables us to collect a bunch fo deferred ids and make a secondary
# deferred id that corresponds to the entire group. This logic is extremely
# difficult to get right though.
def do_gather(engines):
nEngines = len(engines)
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 mapClass = Map.dists[dist]
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 mapObject = mapClass()
d_list = []
# Loop through and push to each engine in non-blocking mode.
# This returns a set of deferreds to deferred_ids
for index, engineid in enumerate(engines):
d = self.pull(key, targets=engineid, block=False)
d_list.append(d)
# Collect the deferred to deferred_ids
d = gatherBoth(d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
# Now d has a list of deferred_ids or Failures coming
d.addCallback(error.collect_exceptions, 'scatter')
def process_did_list(did_list):
"""Turn a list of deferred_ids into a final result or failure."""
new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
final_d = gatherBoth(new_d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
final_d.addCallback(error.collect_exceptions, 'gather')
final_d.addCallback(lambda lop: [i[0] for i in lop])
final_d.addCallback(mapObject.joinPartitions)
return final_d
# Now, depending on block, we need to handle the list deferred_ids
# coming down the pipe diferently.
if block:
# If we are blocking register a callback that will transform the
# list of deferred_ids into the final result.
d.addCallback(process_did_list)
return d
else:
# Here we are going to use a _local_ PendingDeferredManager.
deferred_id = self.pdm.get_deferred_id()
# This is the deferred we will return to the user that will fire
# with the local deferred_id AFTER we have received the list of
# primary deferred_ids
d_to_return = defer.Deferred()
def do_it(did_list):
"""Produce a deferred to the final result, but first fire the
deferred we will return to the user that has the local
deferred id."""
d_to_return.callback(deferred_id)
return process_did_list(did_list)
d.addCallback(do_it)
# Now save the deferred to the final result
self.pdm.save_pending_deferred(d, deferred_id)
return d_to_return
d = self._process_targets(targets)
d.addCallback(do_gather)
return d
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
Brian E Granger
Initial work on updating the frontend APIs. Also fixed a few bugs in ...
r1344 """
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 A parallelized version of Python's builtin map.
This has a slightly different syntax than the builtin `map`.
This is needed because we need to have keyword arguments and thus
can't use *args to capture all the sequences. Instead, they must
be passed in a list or tuple.
raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
Most users will want to use parallel functions or the `mapper`
and `map` methods for an API that follows that of the builtin
`map`.
Brian E Granger
Initial work on updating the frontend APIs. Also fixed a few bugs in ...
r1344 """
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 if not isinstance(sequences, (list, tuple)):
raise TypeError('sequences must be a list or tuple')
max_len = max(len(s) for s in sequences)
for s in sequences:
if len(s)!=max_len:
raise ValueError('all sequences must have equal length')
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 if isinstance(func, FunctionType):
d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
d.addCallback(lambda did: self.get_pending_deferred(did, True))
Brian E Granger
Initial work on updating the frontend APIs. Also fixed a few bugs in ...
r1344 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 elif isinstance(func, str):
d = defer.succeed(None)
sourceToRun = \
Brian E Granger
Initial work on updating the frontend APIs. Also fixed a few bugs in ...
r1344 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 else:
raise TypeError("func must be a function or str")
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
d.addCallback(lambda did: self.get_pending_deferred(did, True))
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 return d
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 def map(self, func, *sequences):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
A parallel version of Python's builtin `map` function.
This method applies a function to sequences of arguments. It
follows the same syntax as the builtin `map`.
This method creates a mapper objects by calling `self.mapper` with
no arguments and then uses that mapper to do the mapping. See
the documentation of `mapper` for more details.
"""
return self.mapper().map(func, *sequences)
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346
def mapper(self, dist='b', targets='all', block=True):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Create a mapper object that has a `map` method.
This method returns an object that implements the `IMapper`
interface. This method is a factory that is used to control how
the map happens.
:Parameters:
dist : str
What decomposition to use, 'b' is the only one supported
currently
targets : str, int, sequence of ints
Which engines to use for the map
block : boolean
Should calls to `map` block or not
"""
return MultiEngineMapper(self, dist, targets, block)
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346
def parallel(self, dist='b', targets='all', block=True):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
A decorator that turns a function into a parallel function.
This can be used as:
@parallel()
def f(x, y)
...
f(range(10), range(10))
This causes f(0,0), f(1,1), ... to be called in parallel.
:Parameters:
dist : str
What decomposition to use, 'b' is the only one supported
currently
targets : str, int, sequence of ints
Which engines to use for the map
block : boolean
Should calls to `map` block or not
"""
mapper = self.mapper(dist, targets, block)
pf = ParallelFunction(mapper)
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 return pf
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 #---------------------------------------------------------------------------
# ISynchronousMultiEngineExtras related methods
#---------------------------------------------------------------------------
def _transformPullResult(self, pushResult, multitargets, lenKeys):
if not multitargets:
result = pushResult[0]
elif lenKeys > 1:
result = zip(*pushResult)
elif lenKeys is 1:
result = list(pushResult)
return result
def zip_pull(self, keys, targets='all', block=True):
multitargets = not isinstance(targets, int) and len(targets) > 1
lenKeys = len(keys)
d = self.pull(keys, targets=targets, block=block)
if block:
d.addCallback(self._transformPullResult, multitargets, lenKeys)
else:
d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
return d
def run(self, fname, targets='all', block=True):
fileobj = open(fname,'r')
source = fileobj.read()
fileobj.close()
# if the compilation blows, we get a local error right away
try:
code = compile(source,fname,'exec')
except:
return defer.fail(failure.Failure())
# Now run the code
d = self.execute(source, targets=targets, block=block)
return d
#---------------------------------------------------------------------------
# IBlockingClientAdaptor related methods
#---------------------------------------------------------------------------
def adapt_to_blocking_client(self):
from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
return IFullBlockingMultiEngineClient(self)