multienginefc.py
760 lines
| 30.2 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | # encoding: utf-8 | ||
""" | ||||
Expose the multiengine controller over the Foolscap network protocol. | ||||
""" | ||||
__docformat__ = "restructuredtext en" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import cPickle as pickle | ||||
from types import FunctionType | ||||
from zope.interface import Interface, implements | ||||
from twisted.internet import defer | ||||
Brian Granger
|
r2498 | from twisted.python import components, failure | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2526 | try: | ||
from foolscap.api import Referenceable | ||||
except ImportError: | ||||
from foolscap import Referenceable | ||||
Brian E Granger
|
r1234 | |||
from IPython.kernel import error | ||||
from IPython.kernel import map as Map | ||||
Brian E Granger
|
r1346 | from IPython.kernel.parallelfunction import ParallelFunction | ||
Brian E Granger
|
r1395 | from IPython.kernel.mapper import ( | ||
MultiEngineMapper, | ||||
IMultiEngineMapperFactory, | ||||
IMapper | ||||
) | ||||
Brian E Granger
|
r1234 | from IPython.kernel.twistedutil import gatherBoth | ||
Brian Granger
|
r2498 | from IPython.kernel.multiengine import ( | ||
Brian E Granger
|
r1234 | IMultiEngine, | ||
IFullSynchronousMultiEngine, | ||||
ISynchronousMultiEngine) | ||||
from IPython.kernel.pendingdeferred import PendingDeferredManager | ||||
Brian Granger
|
r2498 | from IPython.kernel.pickleutil import ( | ||
canDict, | ||||
canSequence, uncanDict, uncanSequence | ||||
) | ||||
Brian E Granger
|
r1234 | |||
from IPython.kernel.clientinterfaces import ( | ||||
IFCClientInterfaceProvider, | ||||
IBlockingClientAdaptor | ||||
) | ||||
# Needed to access the true globals from __main__.__dict__ | ||||
import __main__ | ||||
#------------------------------------------------------------------------------- | ||||
# The Controller side of things | ||||
#------------------------------------------------------------------------------- | ||||
def packageResult(wrappedMethod): | ||||
def wrappedPackageResult(self, *args, **kwargs): | ||||
d = wrappedMethod(self, *args, **kwargs) | ||||
d.addCallback(self.packageSuccess) | ||||
d.addErrback(self.packageFailure) | ||||
return d | ||||
return wrappedPackageResult | ||||
class IFCSynchronousMultiEngine(Interface): | ||||
"""Foolscap interface to `ISynchronousMultiEngine`. | ||||
The methods in this interface are similar to those of | ||||
`ISynchronousMultiEngine`, but their arguments and return values are pickled | ||||
if they are not already simple Python types that can be send over XML-RPC. | ||||
See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for | ||||
documentation about the methods. | ||||
Most methods in this interface act like the `ISynchronousMultiEngine` | ||||
versions and can be called in blocking or non-blocking mode. | ||||
""" | ||||
pass | ||||
class FCSynchronousMultiEngineFromMultiEngine(Referenceable): | ||||
"""Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`. | ||||
""" | ||||
implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider) | ||||
addSlash = True | ||||
def __init__(self, multiengine): | ||||
# Adapt the raw multiengine to `ISynchronousMultiEngine` before saving | ||||
# it. This allow this class to do two adaptation steps. | ||||
self.smultiengine = ISynchronousMultiEngine(multiengine) | ||||
self._deferredIDCallbacks = {} | ||||
#--------------------------------------------------------------------------- | ||||
# Non interface methods | ||||
#--------------------------------------------------------------------------- | ||||
def packageFailure(self, f): | ||||
f.cleanFailure() | ||||
return self.packageSuccess(f) | ||||
def packageSuccess(self, obj): | ||||
serial = pickle.dumps(obj, 2) | ||||
return serial | ||||
#--------------------------------------------------------------------------- | ||||
# Things related to PendingDeferredManager | ||||
#--------------------------------------------------------------------------- | ||||
@packageResult | ||||
def remote_get_pending_deferred(self, deferredID, block): | ||||
d = self.smultiengine.get_pending_deferred(deferredID, block) | ||||
try: | ||||
callback = self._deferredIDCallbacks.pop(deferredID) | ||||
except KeyError: | ||||
callback = None | ||||
if callback is not None: | ||||
d.addCallback(callback[0], *callback[1], **callback[2]) | ||||
return d | ||||
@packageResult | ||||
def remote_clear_pending_deferreds(self): | ||||
return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds) | ||||
def _addDeferredIDCallback(self, did, callback, *args, **kwargs): | ||||
self._deferredIDCallbacks[did] = (callback, args, kwargs) | ||||
return did | ||||
Brian Granger
|
r1829 | |||
Brian E Granger
|
r1234 | #--------------------------------------------------------------------------- | ||
# IEngineMultiplexer related methods | ||||
#--------------------------------------------------------------------------- | ||||
@packageResult | ||||
def remote_execute(self, lines, targets, block): | ||||
return self.smultiengine.execute(lines, targets=targets, block=block) | ||||
@packageResult | ||||
def remote_push(self, binaryNS, targets, block): | ||||
try: | ||||
namespace = pickle.loads(binaryNS) | ||||
except: | ||||
d = defer.fail(failure.Failure()) | ||||
else: | ||||
d = self.smultiengine.push(namespace, targets=targets, block=block) | ||||
return d | ||||
@packageResult | ||||
def remote_pull(self, keys, targets, block): | ||||
d = self.smultiengine.pull(keys, targets=targets, block=block) | ||||
return d | ||||
@packageResult | ||||
def remote_push_function(self, binaryNS, targets, block): | ||||
try: | ||||
namespace = pickle.loads(binaryNS) | ||||
except: | ||||
d = defer.fail(failure.Failure()) | ||||
else: | ||||
namespace = uncanDict(namespace) | ||||
d = self.smultiengine.push_function(namespace, targets=targets, block=block) | ||||
return d | ||||
def _canMultipleKeys(self, result): | ||||
return [canSequence(r) for r in result] | ||||
@packageResult | ||||
def remote_pull_function(self, keys, targets, block): | ||||
def can_functions(r, keys): | ||||
if len(keys)==1 or isinstance(keys, str): | ||||
result = canSequence(r) | ||||
elif len(keys)>1: | ||||
result = [canSequence(s) for s in r] | ||||
return result | ||||
d = self.smultiengine.pull_function(keys, targets=targets, block=block) | ||||
if block: | ||||
d.addCallback(can_functions, keys) | ||||
else: | ||||
d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys)) | ||||
return d | ||||
@packageResult | ||||
def remote_push_serialized(self, binaryNS, targets, block): | ||||
try: | ||||
namespace = pickle.loads(binaryNS) | ||||
except: | ||||
d = defer.fail(failure.Failure()) | ||||
else: | ||||
d = self.smultiengine.push_serialized(namespace, targets=targets, block=block) | ||||
return d | ||||
@packageResult | ||||
def remote_pull_serialized(self, keys, targets, block): | ||||
d = self.smultiengine.pull_serialized(keys, targets=targets, block=block) | ||||
return d | ||||
@packageResult | ||||
def remote_get_result(self, i, targets, block): | ||||
if i == 'None': | ||||
i = None | ||||
return self.smultiengine.get_result(i, targets=targets, block=block) | ||||
@packageResult | ||||
def remote_reset(self, targets, block): | ||||
return self.smultiengine.reset(targets=targets, block=block) | ||||
@packageResult | ||||
def remote_keys(self, targets, block): | ||||
return self.smultiengine.keys(targets=targets, block=block) | ||||
@packageResult | ||||
def remote_kill(self, controller, targets, block): | ||||
return self.smultiengine.kill(controller, targets=targets, block=block) | ||||
@packageResult | ||||
def remote_clear_queue(self, targets, block): | ||||
return self.smultiengine.clear_queue(targets=targets, block=block) | ||||
@packageResult | ||||
def remote_queue_status(self, targets, block): | ||||
return self.smultiengine.queue_status(targets=targets, block=block) | ||||
@packageResult | ||||
def remote_set_properties(self, binaryNS, targets, block): | ||||
try: | ||||
ns = pickle.loads(binaryNS) | ||||
except: | ||||
d = defer.fail(failure.Failure()) | ||||
else: | ||||
d = self.smultiengine.set_properties(ns, targets=targets, block=block) | ||||
return d | ||||
@packageResult | ||||
def remote_get_properties(self, keys, targets, block): | ||||
if keys=='None': | ||||
keys=None | ||||
return self.smultiengine.get_properties(keys, targets=targets, block=block) | ||||
@packageResult | ||||
def remote_has_properties(self, keys, targets, block): | ||||
return self.smultiengine.has_properties(keys, targets=targets, block=block) | ||||
@packageResult | ||||
def remote_del_properties(self, keys, targets, block): | ||||
return self.smultiengine.del_properties(keys, targets=targets, block=block) | ||||
@packageResult | ||||
def remote_clear_properties(self, targets, block): | ||||
return self.smultiengine.clear_properties(targets=targets, block=block) | ||||
#--------------------------------------------------------------------------- | ||||
# IMultiEngine related methods | ||||
#--------------------------------------------------------------------------- | ||||
def remote_get_ids(self): | ||||
"""Get the ids of the registered engines. | ||||
This method always blocks. | ||||
""" | ||||
return self.smultiengine.get_ids() | ||||
#--------------------------------------------------------------------------- | ||||
# IFCClientInterfaceProvider related methods | ||||
#--------------------------------------------------------------------------- | ||||
def remote_get_client_name(self): | ||||
return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient' | ||||
# The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the | ||||
# `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a | ||||
# two phase adaptation. | ||||
components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine, | ||||
IMultiEngine, IFCSynchronousMultiEngine) | ||||
#------------------------------------------------------------------------------- | ||||
# The Client side of things | ||||
#------------------------------------------------------------------------------- | ||||
class FCFullSynchronousMultiEngineClient(object): | ||||
Brian E Granger
|
r1395 | implements( | ||
IFullSynchronousMultiEngine, | ||||
IBlockingClientAdaptor, | ||||
IMultiEngineMapperFactory, | ||||
IMapper | ||||
) | ||||
Brian E Granger
|
r1234 | |||
def __init__(self, remote_reference): | ||||
self.remote_reference = remote_reference | ||||
self._deferredIDCallbacks = {} | ||||
# This class manages some pending deferreds through this instance. This | ||||
# is required for methods like gather/scatter as it enables us to | ||||
# create our own pending deferreds for composite operations. | ||||
self.pdm = PendingDeferredManager() | ||||
#--------------------------------------------------------------------------- | ||||
# Non interface methods | ||||
#--------------------------------------------------------------------------- | ||||
def unpackage(self, r): | ||||
return pickle.loads(r) | ||||
#--------------------------------------------------------------------------- | ||||
# Things related to PendingDeferredManager | ||||
#--------------------------------------------------------------------------- | ||||
def get_pending_deferred(self, deferredID, block=True): | ||||
# Because we are managing some pending deferreds locally (through | ||||
# self.pdm) and some remotely (on the controller), we first try the | ||||
# local one and then the remote one. | ||||
if self.pdm.quick_has_id(deferredID): | ||||
d = self.pdm.get_pending_deferred(deferredID, block) | ||||
return d | ||||
else: | ||||
d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block) | ||||
d.addCallback(self.unpackage) | ||||
try: | ||||
callback = self._deferredIDCallbacks.pop(deferredID) | ||||
except KeyError: | ||||
callback = None | ||||
if callback is not None: | ||||
d.addCallback(callback[0], *callback[1], **callback[2]) | ||||
return d | ||||
def clear_pending_deferreds(self): | ||||
# This clear both the local (self.pdm) and remote pending deferreds | ||||
self.pdm.clear_pending_deferreds() | ||||
d2 = self.remote_reference.callRemote('clear_pending_deferreds') | ||||
d2.addCallback(self.unpackage) | ||||
return d2 | ||||
def _addDeferredIDCallback(self, did, callback, *args, **kwargs): | ||||
self._deferredIDCallbacks[did] = (callback, args, kwargs) | ||||
return did | ||||
#--------------------------------------------------------------------------- | ||||
# IEngineMultiplexer related methods | ||||
#--------------------------------------------------------------------------- | ||||
Brian Granger
|
r1829 | |||
Brian E Granger
|
r1234 | def execute(self, lines, targets='all', block=True): | ||
d = self.remote_reference.callRemote('execute', lines, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def push(self, namespace, targets='all', block=True): | ||||
serial = pickle.dumps(namespace, 2) | ||||
d = self.remote_reference.callRemote('push', serial, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def pull(self, keys, targets='all', block=True): | ||||
d = self.remote_reference.callRemote('pull', keys, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def push_function(self, namespace, targets='all', block=True): | ||||
cannedNamespace = canDict(namespace) | ||||
serial = pickle.dumps(cannedNamespace, 2) | ||||
d = self.remote_reference.callRemote('push_function', serial, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def pull_function(self, keys, targets='all', block=True): | ||||
def uncan_functions(r, keys): | ||||
if len(keys)==1 or isinstance(keys, str): | ||||
return uncanSequence(r) | ||||
elif len(keys)>1: | ||||
return [uncanSequence(s) for s in r] | ||||
d = self.remote_reference.callRemote('pull_function', keys, targets, block) | ||||
if block: | ||||
d.addCallback(self.unpackage) | ||||
d.addCallback(uncan_functions, keys) | ||||
else: | ||||
d.addCallback(self.unpackage) | ||||
d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys)) | ||||
return d | ||||
def push_serialized(self, namespace, targets='all', block=True): | ||||
cannedNamespace = canDict(namespace) | ||||
serial = pickle.dumps(cannedNamespace, 2) | ||||
d = self.remote_reference.callRemote('push_serialized', serial, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def pull_serialized(self, keys, targets='all', block=True): | ||||
d = self.remote_reference.callRemote('pull_serialized', keys, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def get_result(self, i=None, targets='all', block=True): | ||||
if i is None: # This is because None cannot be marshalled by xml-rpc | ||||
i = 'None' | ||||
d = self.remote_reference.callRemote('get_result', i, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def reset(self, targets='all', block=True): | ||||
d = self.remote_reference.callRemote('reset', targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def keys(self, targets='all', block=True): | ||||
d = self.remote_reference.callRemote('keys', targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def kill(self, controller=False, targets='all', block=True): | ||||
d = self.remote_reference.callRemote('kill', controller, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def clear_queue(self, targets='all', block=True): | ||||
d = self.remote_reference.callRemote('clear_queue', targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def queue_status(self, targets='all', block=True): | ||||
d = self.remote_reference.callRemote('queue_status', targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def set_properties(self, properties, targets='all', block=True): | ||||
serial = pickle.dumps(properties, 2) | ||||
d = self.remote_reference.callRemote('set_properties', serial, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def get_properties(self, keys=None, targets='all', block=True): | ||||
if keys==None: | ||||
keys='None' | ||||
d = self.remote_reference.callRemote('get_properties', keys, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def has_properties(self, keys, targets='all', block=True): | ||||
d = self.remote_reference.callRemote('has_properties', keys, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def del_properties(self, keys, targets='all', block=True): | ||||
d = self.remote_reference.callRemote('del_properties', keys, targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def clear_properties(self, targets='all', block=True): | ||||
d = self.remote_reference.callRemote('clear_properties', targets, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
#--------------------------------------------------------------------------- | ||||
# IMultiEngine related methods | ||||
#--------------------------------------------------------------------------- | ||||
def get_ids(self): | ||||
d = self.remote_reference.callRemote('get_ids') | ||||
return d | ||||
#--------------------------------------------------------------------------- | ||||
# ISynchronousMultiEngineCoordinator related methods | ||||
#--------------------------------------------------------------------------- | ||||
def _process_targets(self, targets): | ||||
def create_targets(ids): | ||||
if isinstance(targets, int): | ||||
engines = [targets] | ||||
elif targets=='all': | ||||
engines = ids | ||||
elif isinstance(targets, (list, tuple)): | ||||
engines = targets | ||||
for t in engines: | ||||
if not t in ids: | ||||
raise error.InvalidEngineID("engine with id %r does not exist"%t) | ||||
return engines | ||||
d = self.get_ids() | ||||
d.addCallback(create_targets) | ||||
return d | ||||
Brian E Granger
|
r1346 | def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True): | ||
Brian E Granger
|
r1234 | |||
# Note: scatter and gather handle pending deferreds locally through self.pdm. | ||||
# This enables us to collect a bunch fo deferred ids and make a secondary | ||||
# deferred id that corresponds to the entire group. This logic is extremely | ||||
# difficult to get right though. | ||||
def do_scatter(engines): | ||||
nEngines = len(engines) | ||||
Brian E Granger
|
r1346 | mapClass = Map.dists[dist] | ||
Brian E Granger
|
r1234 | mapObject = mapClass() | ||
d_list = [] | ||||
# Loop through and push to each engine in non-blocking mode. | ||||
# This returns a set of deferreds to deferred_ids | ||||
for index, engineid in enumerate(engines): | ||||
partition = mapObject.getPartition(seq, index, nEngines) | ||||
if flatten and len(partition) == 1: | ||||
d = self.push({key: partition[0]}, targets=engineid, block=False) | ||||
else: | ||||
d = self.push({key: partition}, targets=engineid, block=False) | ||||
d_list.append(d) | ||||
# Collect the deferred to deferred_ids | ||||
d = gatherBoth(d_list, | ||||
fireOnOneErrback=0, | ||||
consumeErrors=1, | ||||
logErrors=0) | ||||
# Now d has a list of deferred_ids or Failures coming | ||||
d.addCallback(error.collect_exceptions, 'scatter') | ||||
def process_did_list(did_list): | ||||
"""Turn a list of deferred_ids into a final result or failure.""" | ||||
new_d_list = [self.get_pending_deferred(did, True) for did in did_list] | ||||
final_d = gatherBoth(new_d_list, | ||||
fireOnOneErrback=0, | ||||
consumeErrors=1, | ||||
logErrors=0) | ||||
final_d.addCallback(error.collect_exceptions, 'scatter') | ||||
final_d.addCallback(lambda lop: [i[0] for i in lop]) | ||||
return final_d | ||||
# Now, depending on block, we need to handle the list deferred_ids | ||||
# coming down the pipe diferently. | ||||
if block: | ||||
# If we are blocking register a callback that will transform the | ||||
# list of deferred_ids into the final result. | ||||
d.addCallback(process_did_list) | ||||
return d | ||||
else: | ||||
# Here we are going to use a _local_ PendingDeferredManager. | ||||
deferred_id = self.pdm.get_deferred_id() | ||||
# This is the deferred we will return to the user that will fire | ||||
# with the local deferred_id AFTER we have received the list of | ||||
# primary deferred_ids | ||||
d_to_return = defer.Deferred() | ||||
def do_it(did_list): | ||||
"""Produce a deferred to the final result, but first fire the | ||||
deferred we will return to the user that has the local | ||||
deferred id.""" | ||||
d_to_return.callback(deferred_id) | ||||
return process_did_list(did_list) | ||||
d.addCallback(do_it) | ||||
# Now save the deferred to the final result | ||||
self.pdm.save_pending_deferred(d, deferred_id) | ||||
return d_to_return | ||||
d = self._process_targets(targets) | ||||
d.addCallback(do_scatter) | ||||
return d | ||||
Brian E Granger
|
r1346 | def gather(self, key, dist='b', targets='all', block=True): | ||
Brian E Granger
|
r1234 | |||
# Note: scatter and gather handle pending deferreds locally through self.pdm. | ||||
# This enables us to collect a bunch fo deferred ids and make a secondary | ||||
# deferred id that corresponds to the entire group. This logic is extremely | ||||
# difficult to get right though. | ||||
def do_gather(engines): | ||||
nEngines = len(engines) | ||||
Brian E Granger
|
r1346 | mapClass = Map.dists[dist] | ||
Brian E Granger
|
r1234 | mapObject = mapClass() | ||
d_list = [] | ||||
# Loop through and push to each engine in non-blocking mode. | ||||
# This returns a set of deferreds to deferred_ids | ||||
for index, engineid in enumerate(engines): | ||||
d = self.pull(key, targets=engineid, block=False) | ||||
d_list.append(d) | ||||
# Collect the deferred to deferred_ids | ||||
d = gatherBoth(d_list, | ||||
fireOnOneErrback=0, | ||||
consumeErrors=1, | ||||
logErrors=0) | ||||
# Now d has a list of deferred_ids or Failures coming | ||||
d.addCallback(error.collect_exceptions, 'scatter') | ||||
def process_did_list(did_list): | ||||
"""Turn a list of deferred_ids into a final result or failure.""" | ||||
new_d_list = [self.get_pending_deferred(did, True) for did in did_list] | ||||
final_d = gatherBoth(new_d_list, | ||||
fireOnOneErrback=0, | ||||
consumeErrors=1, | ||||
logErrors=0) | ||||
final_d.addCallback(error.collect_exceptions, 'gather') | ||||
final_d.addCallback(lambda lop: [i[0] for i in lop]) | ||||
final_d.addCallback(mapObject.joinPartitions) | ||||
return final_d | ||||
# Now, depending on block, we need to handle the list deferred_ids | ||||
# coming down the pipe diferently. | ||||
if block: | ||||
# If we are blocking register a callback that will transform the | ||||
# list of deferred_ids into the final result. | ||||
d.addCallback(process_did_list) | ||||
return d | ||||
else: | ||||
# Here we are going to use a _local_ PendingDeferredManager. | ||||
deferred_id = self.pdm.get_deferred_id() | ||||
# This is the deferred we will return to the user that will fire | ||||
# with the local deferred_id AFTER we have received the list of | ||||
# primary deferred_ids | ||||
d_to_return = defer.Deferred() | ||||
def do_it(did_list): | ||||
"""Produce a deferred to the final result, but first fire the | ||||
deferred we will return to the user that has the local | ||||
deferred id.""" | ||||
d_to_return.callback(deferred_id) | ||||
return process_did_list(did_list) | ||||
d.addCallback(do_it) | ||||
# Now save the deferred to the final result | ||||
self.pdm.save_pending_deferred(d, deferred_id) | ||||
return d_to_return | ||||
d = self._process_targets(targets) | ||||
d.addCallback(do_gather) | ||||
return d | ||||
Brian E Granger
|
r1395 | def raw_map(self, func, sequences, dist='b', targets='all', block=True): | ||
Brian E Granger
|
r1344 | """ | ||
Brian E Granger
|
r1395 | A parallelized version of Python's builtin map. | ||
This has a slightly different syntax than the builtin `map`. | ||||
This is needed because we need to have keyword arguments and thus | ||||
can't use *args to capture all the sequences. Instead, they must | ||||
be passed in a list or tuple. | ||||
raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) | ||||
Most users will want to use parallel functions or the `mapper` | ||||
and `map` methods for an API that follows that of the builtin | ||||
`map`. | ||||
Brian E Granger
|
r1344 | """ | ||
Brian E Granger
|
r1346 | if not isinstance(sequences, (list, tuple)): | ||
raise TypeError('sequences must be a list or tuple') | ||||
max_len = max(len(s) for s in sequences) | ||||
for s in sequences: | ||||
if len(s)!=max_len: | ||||
raise ValueError('all sequences must have equal length') | ||||
Brian E Granger
|
r1234 | if isinstance(func, FunctionType): | ||
d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False) | ||||
d.addCallback(lambda did: self.get_pending_deferred(did, True)) | ||||
Brian E Granger
|
r1344 | sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))' | ||
Brian E Granger
|
r1234 | elif isinstance(func, str): | ||
d = defer.succeed(None) | ||||
sourceToRun = \ | ||||
Brian E Granger
|
r1344 | '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func | ||
Brian E Granger
|
r1234 | else: | ||
raise TypeError("func must be a function or str") | ||||
Brian E Granger
|
r1346 | d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets)) | ||
Brian E Granger
|
r1234 | d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False)) | ||
d.addCallback(lambda did: self.get_pending_deferred(did, True)) | ||||
Brian E Granger
|
r1346 | d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block)) | ||
Brian E Granger
|
r1234 | return d | ||
Brian E Granger
|
r1346 | def map(self, func, *sequences): | ||
Brian E Granger
|
r1395 | """ | ||
A parallel version of Python's builtin `map` function. | ||||
This method applies a function to sequences of arguments. It | ||||
follows the same syntax as the builtin `map`. | ||||
This method creates a mapper objects by calling `self.mapper` with | ||||
no arguments and then uses that mapper to do the mapping. See | ||||
the documentation of `mapper` for more details. | ||||
""" | ||||
return self.mapper().map(func, *sequences) | ||||
Brian E Granger
|
r1346 | |||
def mapper(self, dist='b', targets='all', block=True): | ||||
Brian E Granger
|
r1395 | """ | ||
Create a mapper object that has a `map` method. | ||||
This method returns an object that implements the `IMapper` | ||||
interface. This method is a factory that is used to control how | ||||
the map happens. | ||||
:Parameters: | ||||
dist : str | ||||
What decomposition to use, 'b' is the only one supported | ||||
currently | ||||
targets : str, int, sequence of ints | ||||
Which engines to use for the map | ||||
block : boolean | ||||
Should calls to `map` block or not | ||||
""" | ||||
return MultiEngineMapper(self, dist, targets, block) | ||||
Brian E Granger
|
r1346 | |||
def parallel(self, dist='b', targets='all', block=True): | ||||
Brian E Granger
|
r1395 | """ | ||
A decorator that turns a function into a parallel function. | ||||
This can be used as: | ||||
@parallel() | ||||
def f(x, y) | ||||
... | ||||
f(range(10), range(10)) | ||||
This causes f(0,0), f(1,1), ... to be called in parallel. | ||||
:Parameters: | ||||
dist : str | ||||
What decomposition to use, 'b' is the only one supported | ||||
currently | ||||
targets : str, int, sequence of ints | ||||
Which engines to use for the map | ||||
block : boolean | ||||
Should calls to `map` block or not | ||||
""" | ||||
mapper = self.mapper(dist, targets, block) | ||||
pf = ParallelFunction(mapper) | ||||
Brian E Granger
|
r1346 | return pf | ||
Brian E Granger
|
r1234 | #--------------------------------------------------------------------------- | ||
# ISynchronousMultiEngineExtras related methods | ||||
#--------------------------------------------------------------------------- | ||||
def _transformPullResult(self, pushResult, multitargets, lenKeys): | ||||
if not multitargets: | ||||
result = pushResult[0] | ||||
elif lenKeys > 1: | ||||
result = zip(*pushResult) | ||||
elif lenKeys is 1: | ||||
result = list(pushResult) | ||||
return result | ||||
def zip_pull(self, keys, targets='all', block=True): | ||||
multitargets = not isinstance(targets, int) and len(targets) > 1 | ||||
lenKeys = len(keys) | ||||
d = self.pull(keys, targets=targets, block=block) | ||||
if block: | ||||
d.addCallback(self._transformPullResult, multitargets, lenKeys) | ||||
else: | ||||
d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys)) | ||||
return d | ||||
def run(self, fname, targets='all', block=True): | ||||
fileobj = open(fname,'r') | ||||
source = fileobj.read() | ||||
fileobj.close() | ||||
# if the compilation blows, we get a local error right away | ||||
try: | ||||
code = compile(source,fname,'exec') | ||||
except: | ||||
return defer.fail(failure.Failure()) | ||||
# Now run the code | ||||
d = self.execute(source, targets=targets, block=block) | ||||
return d | ||||
#--------------------------------------------------------------------------- | ||||
# IBlockingClientAdaptor related methods | ||||
#--------------------------------------------------------------------------- | ||||
def adapt_to_blocking_client(self): | ||||
from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient | ||||
return IFullBlockingMultiEngineClient(self) | ||||