|
|
# encoding: utf-8
|
|
|
|
|
|
"""
|
|
|
Expose the multiengine controller over the Foolscap network protocol.
|
|
|
"""
|
|
|
|
|
|
__docformat__ = "restructuredtext en"
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Copyright (C) 2008 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
import cPickle as pickle
|
|
|
from types import FunctionType
|
|
|
|
|
|
from zope.interface import Interface, implements
|
|
|
from twisted.internet import defer
|
|
|
from twisted.python import components, failure, log
|
|
|
|
|
|
try:
|
|
|
# This is preferred in foolscap v > 0.4.3
|
|
|
from foolscap.api import Referenceable
|
|
|
except ImportError:
|
|
|
# Fallback for older versions
|
|
|
from foolscap import Referenceable
|
|
|
|
|
|
from IPython.kernel import error
|
|
|
from IPython.kernel.util import printer
|
|
|
from IPython.kernel import map as Map
|
|
|
from IPython.kernel.parallelfunction import ParallelFunction
|
|
|
from IPython.kernel.mapper import (
|
|
|
MultiEngineMapper,
|
|
|
IMultiEngineMapperFactory,
|
|
|
IMapper
|
|
|
)
|
|
|
from IPython.kernel.twistedutil import gatherBoth
|
|
|
from IPython.kernel.multiengine import (MultiEngine,
|
|
|
IMultiEngine,
|
|
|
IFullSynchronousMultiEngine,
|
|
|
ISynchronousMultiEngine)
|
|
|
from IPython.kernel.multiengineclient import wrapResultList
|
|
|
from IPython.kernel.pendingdeferred import PendingDeferredManager
|
|
|
from IPython.kernel.pickleutil import (can, canDict,
|
|
|
canSequence, uncan, uncanDict, uncanSequence)
|
|
|
|
|
|
from IPython.kernel.clientinterfaces import (
|
|
|
IFCClientInterfaceProvider,
|
|
|
IBlockingClientAdaptor
|
|
|
)
|
|
|
|
|
|
# Needed to access the true globals from __main__.__dict__
|
|
|
import __main__
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# The Controller side of things
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
def packageResult(wrappedMethod):
|
|
|
|
|
|
def wrappedPackageResult(self, *args, **kwargs):
|
|
|
d = wrappedMethod(self, *args, **kwargs)
|
|
|
d.addCallback(self.packageSuccess)
|
|
|
d.addErrback(self.packageFailure)
|
|
|
return d
|
|
|
return wrappedPackageResult
|
|
|
|
|
|
|
|
|
class IFCSynchronousMultiEngine(Interface):
|
|
|
"""Foolscap interface to `ISynchronousMultiEngine`.
|
|
|
|
|
|
The methods in this interface are similar to those of
|
|
|
`ISynchronousMultiEngine`, but their arguments and return values are pickled
|
|
|
if they are not already simple Python types that can be send over XML-RPC.
|
|
|
|
|
|
See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
|
|
|
documentation about the methods.
|
|
|
|
|
|
Most methods in this interface act like the `ISynchronousMultiEngine`
|
|
|
versions and can be called in blocking or non-blocking mode.
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
|
|
|
class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
|
|
|
"""Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
|
|
|
"""
|
|
|
|
|
|
implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
|
|
|
|
|
|
addSlash = True
|
|
|
|
|
|
def __init__(self, multiengine):
|
|
|
# Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
|
|
|
# it. This allow this class to do two adaptation steps.
|
|
|
self.smultiengine = ISynchronousMultiEngine(multiengine)
|
|
|
self._deferredIDCallbacks = {}
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# Non interface methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def packageFailure(self, f):
|
|
|
f.cleanFailure()
|
|
|
return self.packageSuccess(f)
|
|
|
|
|
|
def packageSuccess(self, obj):
|
|
|
serial = pickle.dumps(obj, 2)
|
|
|
return serial
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# Things related to PendingDeferredManager
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
@packageResult
|
|
|
def remote_get_pending_deferred(self, deferredID, block):
|
|
|
d = self.smultiengine.get_pending_deferred(deferredID, block)
|
|
|
try:
|
|
|
callback = self._deferredIDCallbacks.pop(deferredID)
|
|
|
except KeyError:
|
|
|
callback = None
|
|
|
if callback is not None:
|
|
|
d.addCallback(callback[0], *callback[1], **callback[2])
|
|
|
return d
|
|
|
|
|
|
@packageResult
|
|
|
def remote_clear_pending_deferreds(self):
|
|
|
return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
|
|
|
|
|
|
def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
|
|
|
self._deferredIDCallbacks[did] = (callback, args, kwargs)
|
|
|
return did
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# IEngineMultiplexer related methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
@packageResult
|
|
|
def remote_execute(self, lines, targets, block):
|
|
|
return self.smultiengine.execute(lines, targets=targets, block=block)
|
|
|
|
|
|
@packageResult
|
|
|
def remote_push(self, binaryNS, targets, block):
|
|
|
try:
|
|
|
namespace = pickle.loads(binaryNS)
|
|
|
except:
|
|
|
d = defer.fail(failure.Failure())
|
|
|
else:
|
|
|
d = self.smultiengine.push(namespace, targets=targets, block=block)
|
|
|
return d
|
|
|
|
|
|
@packageResult
|
|
|
def remote_pull(self, keys, targets, block):
|
|
|
d = self.smultiengine.pull(keys, targets=targets, block=block)
|
|
|
return d
|
|
|
|
|
|
@packageResult
|
|
|
def remote_push_function(self, binaryNS, targets, block):
|
|
|
try:
|
|
|
namespace = pickle.loads(binaryNS)
|
|
|
except:
|
|
|
d = defer.fail(failure.Failure())
|
|
|
else:
|
|
|
namespace = uncanDict(namespace)
|
|
|
d = self.smultiengine.push_function(namespace, targets=targets, block=block)
|
|
|
return d
|
|
|
|
|
|
def _canMultipleKeys(self, result):
|
|
|
return [canSequence(r) for r in result]
|
|
|
|
|
|
@packageResult
|
|
|
def remote_pull_function(self, keys, targets, block):
|
|
|
def can_functions(r, keys):
|
|
|
if len(keys)==1 or isinstance(keys, str):
|
|
|
result = canSequence(r)
|
|
|
elif len(keys)>1:
|
|
|
result = [canSequence(s) for s in r]
|
|
|
return result
|
|
|
d = self.smultiengine.pull_function(keys, targets=targets, block=block)
|
|
|
if block:
|
|
|
d.addCallback(can_functions, keys)
|
|
|
else:
|
|
|
d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
|
|
|
return d
|
|
|
|
|
|
@packageResult
|
|
|
def remote_push_serialized(self, binaryNS, targets, block):
|
|
|
try:
|
|
|
namespace = pickle.loads(binaryNS)
|
|
|
except:
|
|
|
d = defer.fail(failure.Failure())
|
|
|
else:
|
|
|
d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
|
|
|
return d
|
|
|
|
|
|
@packageResult
|
|
|
def remote_pull_serialized(self, keys, targets, block):
|
|
|
d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
|
|
|
return d
|
|
|
|
|
|
@packageResult
|
|
|
def remote_get_result(self, i, targets, block):
|
|
|
if i == 'None':
|
|
|
i = None
|
|
|
return self.smultiengine.get_result(i, targets=targets, block=block)
|
|
|
|
|
|
@packageResult
|
|
|
def remote_reset(self, targets, block):
|
|
|
return self.smultiengine.reset(targets=targets, block=block)
|
|
|
|
|
|
@packageResult
|
|
|
def remote_keys(self, targets, block):
|
|
|
return self.smultiengine.keys(targets=targets, block=block)
|
|
|
|
|
|
@packageResult
|
|
|
def remote_kill(self, controller, targets, block):
|
|
|
return self.smultiengine.kill(controller, targets=targets, block=block)
|
|
|
|
|
|
@packageResult
|
|
|
def remote_clear_queue(self, targets, block):
|
|
|
return self.smultiengine.clear_queue(targets=targets, block=block)
|
|
|
|
|
|
@packageResult
|
|
|
def remote_queue_status(self, targets, block):
|
|
|
return self.smultiengine.queue_status(targets=targets, block=block)
|
|
|
|
|
|
@packageResult
|
|
|
def remote_set_properties(self, binaryNS, targets, block):
|
|
|
try:
|
|
|
ns = pickle.loads(binaryNS)
|
|
|
except:
|
|
|
d = defer.fail(failure.Failure())
|
|
|
else:
|
|
|
d = self.smultiengine.set_properties(ns, targets=targets, block=block)
|
|
|
return d
|
|
|
|
|
|
@packageResult
|
|
|
def remote_get_properties(self, keys, targets, block):
|
|
|
if keys=='None':
|
|
|
keys=None
|
|
|
return self.smultiengine.get_properties(keys, targets=targets, block=block)
|
|
|
|
|
|
@packageResult
|
|
|
def remote_has_properties(self, keys, targets, block):
|
|
|
return self.smultiengine.has_properties(keys, targets=targets, block=block)
|
|
|
|
|
|
@packageResult
|
|
|
def remote_del_properties(self, keys, targets, block):
|
|
|
return self.smultiengine.del_properties(keys, targets=targets, block=block)
|
|
|
|
|
|
@packageResult
|
|
|
def remote_clear_properties(self, targets, block):
|
|
|
return self.smultiengine.clear_properties(targets=targets, block=block)
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# IMultiEngine related methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def remote_get_ids(self):
|
|
|
"""Get the ids of the registered engines.
|
|
|
|
|
|
This method always blocks.
|
|
|
"""
|
|
|
return self.smultiengine.get_ids()
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# IFCClientInterfaceProvider related methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def remote_get_client_name(self):
|
|
|
return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
|
|
|
|
|
|
|
|
|
# The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
|
|
|
# `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
|
|
|
# two phase adaptation.
|
|
|
components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
|
|
|
IMultiEngine, IFCSynchronousMultiEngine)
|
|
|
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# The Client side of things
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
class FCFullSynchronousMultiEngineClient(object):
|
|
|
|
|
|
implements(
|
|
|
IFullSynchronousMultiEngine,
|
|
|
IBlockingClientAdaptor,
|
|
|
IMultiEngineMapperFactory,
|
|
|
IMapper
|
|
|
)
|
|
|
|
|
|
def __init__(self, remote_reference):
|
|
|
self.remote_reference = remote_reference
|
|
|
self._deferredIDCallbacks = {}
|
|
|
# This class manages some pending deferreds through this instance. This
|
|
|
# is required for methods like gather/scatter as it enables us to
|
|
|
# create our own pending deferreds for composite operations.
|
|
|
self.pdm = PendingDeferredManager()
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# Non interface methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def unpackage(self, r):
|
|
|
return pickle.loads(r)
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# Things related to PendingDeferredManager
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def get_pending_deferred(self, deferredID, block=True):
|
|
|
|
|
|
# Because we are managing some pending deferreds locally (through
|
|
|
# self.pdm) and some remotely (on the controller), we first try the
|
|
|
# local one and then the remote one.
|
|
|
if self.pdm.quick_has_id(deferredID):
|
|
|
d = self.pdm.get_pending_deferred(deferredID, block)
|
|
|
return d
|
|
|
else:
|
|
|
d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
try:
|
|
|
callback = self._deferredIDCallbacks.pop(deferredID)
|
|
|
except KeyError:
|
|
|
callback = None
|
|
|
if callback is not None:
|
|
|
d.addCallback(callback[0], *callback[1], **callback[2])
|
|
|
return d
|
|
|
|
|
|
def clear_pending_deferreds(self):
|
|
|
|
|
|
# This clear both the local (self.pdm) and remote pending deferreds
|
|
|
self.pdm.clear_pending_deferreds()
|
|
|
d2 = self.remote_reference.callRemote('clear_pending_deferreds')
|
|
|
d2.addCallback(self.unpackage)
|
|
|
return d2
|
|
|
|
|
|
def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
|
|
|
self._deferredIDCallbacks[did] = (callback, args, kwargs)
|
|
|
return did
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# IEngineMultiplexer related methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def execute(self, lines, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('execute', lines, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def push(self, namespace, targets='all', block=True):
|
|
|
serial = pickle.dumps(namespace, 2)
|
|
|
d = self.remote_reference.callRemote('push', serial, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def pull(self, keys, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('pull', keys, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def push_function(self, namespace, targets='all', block=True):
|
|
|
cannedNamespace = canDict(namespace)
|
|
|
serial = pickle.dumps(cannedNamespace, 2)
|
|
|
d = self.remote_reference.callRemote('push_function', serial, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def pull_function(self, keys, targets='all', block=True):
|
|
|
def uncan_functions(r, keys):
|
|
|
if len(keys)==1 or isinstance(keys, str):
|
|
|
return uncanSequence(r)
|
|
|
elif len(keys)>1:
|
|
|
return [uncanSequence(s) for s in r]
|
|
|
d = self.remote_reference.callRemote('pull_function', keys, targets, block)
|
|
|
if block:
|
|
|
d.addCallback(self.unpackage)
|
|
|
d.addCallback(uncan_functions, keys)
|
|
|
else:
|
|
|
d.addCallback(self.unpackage)
|
|
|
d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
|
|
|
return d
|
|
|
|
|
|
def push_serialized(self, namespace, targets='all', block=True):
|
|
|
cannedNamespace = canDict(namespace)
|
|
|
serial = pickle.dumps(cannedNamespace, 2)
|
|
|
d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def pull_serialized(self, keys, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def get_result(self, i=None, targets='all', block=True):
|
|
|
if i is None: # This is because None cannot be marshalled by xml-rpc
|
|
|
i = 'None'
|
|
|
d = self.remote_reference.callRemote('get_result', i, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def reset(self, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('reset', targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def keys(self, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('keys', targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def kill(self, controller=False, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('kill', controller, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def clear_queue(self, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('clear_queue', targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def queue_status(self, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('queue_status', targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def set_properties(self, properties, targets='all', block=True):
|
|
|
serial = pickle.dumps(properties, 2)
|
|
|
d = self.remote_reference.callRemote('set_properties', serial, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def get_properties(self, keys=None, targets='all', block=True):
|
|
|
if keys==None:
|
|
|
keys='None'
|
|
|
d = self.remote_reference.callRemote('get_properties', keys, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def has_properties(self, keys, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('has_properties', keys, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def del_properties(self, keys, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('del_properties', keys, targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
def clear_properties(self, targets='all', block=True):
|
|
|
d = self.remote_reference.callRemote('clear_properties', targets, block)
|
|
|
d.addCallback(self.unpackage)
|
|
|
return d
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# IMultiEngine related methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def get_ids(self):
|
|
|
d = self.remote_reference.callRemote('get_ids')
|
|
|
return d
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# ISynchronousMultiEngineCoordinator related methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def _process_targets(self, targets):
|
|
|
def create_targets(ids):
|
|
|
if isinstance(targets, int):
|
|
|
engines = [targets]
|
|
|
elif targets=='all':
|
|
|
engines = ids
|
|
|
elif isinstance(targets, (list, tuple)):
|
|
|
engines = targets
|
|
|
for t in engines:
|
|
|
if not t in ids:
|
|
|
raise error.InvalidEngineID("engine with id %r does not exist"%t)
|
|
|
return engines
|
|
|
|
|
|
d = self.get_ids()
|
|
|
d.addCallback(create_targets)
|
|
|
return d
|
|
|
|
|
|
def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
|
|
|
|
|
|
# Note: scatter and gather handle pending deferreds locally through self.pdm.
|
|
|
# This enables us to collect a bunch fo deferred ids and make a secondary
|
|
|
# deferred id that corresponds to the entire group. This logic is extremely
|
|
|
# difficult to get right though.
|
|
|
def do_scatter(engines):
|
|
|
nEngines = len(engines)
|
|
|
mapClass = Map.dists[dist]
|
|
|
mapObject = mapClass()
|
|
|
d_list = []
|
|
|
# Loop through and push to each engine in non-blocking mode.
|
|
|
# This returns a set of deferreds to deferred_ids
|
|
|
for index, engineid in enumerate(engines):
|
|
|
partition = mapObject.getPartition(seq, index, nEngines)
|
|
|
if flatten and len(partition) == 1:
|
|
|
d = self.push({key: partition[0]}, targets=engineid, block=False)
|
|
|
else:
|
|
|
d = self.push({key: partition}, targets=engineid, block=False)
|
|
|
d_list.append(d)
|
|
|
# Collect the deferred to deferred_ids
|
|
|
d = gatherBoth(d_list,
|
|
|
fireOnOneErrback=0,
|
|
|
consumeErrors=1,
|
|
|
logErrors=0)
|
|
|
# Now d has a list of deferred_ids or Failures coming
|
|
|
d.addCallback(error.collect_exceptions, 'scatter')
|
|
|
def process_did_list(did_list):
|
|
|
"""Turn a list of deferred_ids into a final result or failure."""
|
|
|
new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
|
|
|
final_d = gatherBoth(new_d_list,
|
|
|
fireOnOneErrback=0,
|
|
|
consumeErrors=1,
|
|
|
logErrors=0)
|
|
|
final_d.addCallback(error.collect_exceptions, 'scatter')
|
|
|
final_d.addCallback(lambda lop: [i[0] for i in lop])
|
|
|
return final_d
|
|
|
# Now, depending on block, we need to handle the list deferred_ids
|
|
|
# coming down the pipe diferently.
|
|
|
if block:
|
|
|
# If we are blocking register a callback that will transform the
|
|
|
# list of deferred_ids into the final result.
|
|
|
d.addCallback(process_did_list)
|
|
|
return d
|
|
|
else:
|
|
|
# Here we are going to use a _local_ PendingDeferredManager.
|
|
|
deferred_id = self.pdm.get_deferred_id()
|
|
|
# This is the deferred we will return to the user that will fire
|
|
|
# with the local deferred_id AFTER we have received the list of
|
|
|
# primary deferred_ids
|
|
|
d_to_return = defer.Deferred()
|
|
|
def do_it(did_list):
|
|
|
"""Produce a deferred to the final result, but first fire the
|
|
|
deferred we will return to the user that has the local
|
|
|
deferred id."""
|
|
|
d_to_return.callback(deferred_id)
|
|
|
return process_did_list(did_list)
|
|
|
d.addCallback(do_it)
|
|
|
# Now save the deferred to the final result
|
|
|
self.pdm.save_pending_deferred(d, deferred_id)
|
|
|
return d_to_return
|
|
|
|
|
|
d = self._process_targets(targets)
|
|
|
d.addCallback(do_scatter)
|
|
|
return d
|
|
|
|
|
|
def gather(self, key, dist='b', targets='all', block=True):
|
|
|
|
|
|
# Note: scatter and gather handle pending deferreds locally through self.pdm.
|
|
|
# This enables us to collect a bunch fo deferred ids and make a secondary
|
|
|
# deferred id that corresponds to the entire group. This logic is extremely
|
|
|
# difficult to get right though.
|
|
|
def do_gather(engines):
|
|
|
nEngines = len(engines)
|
|
|
mapClass = Map.dists[dist]
|
|
|
mapObject = mapClass()
|
|
|
d_list = []
|
|
|
# Loop through and push to each engine in non-blocking mode.
|
|
|
# This returns a set of deferreds to deferred_ids
|
|
|
for index, engineid in enumerate(engines):
|
|
|
d = self.pull(key, targets=engineid, block=False)
|
|
|
d_list.append(d)
|
|
|
# Collect the deferred to deferred_ids
|
|
|
d = gatherBoth(d_list,
|
|
|
fireOnOneErrback=0,
|
|
|
consumeErrors=1,
|
|
|
logErrors=0)
|
|
|
# Now d has a list of deferred_ids or Failures coming
|
|
|
d.addCallback(error.collect_exceptions, 'scatter')
|
|
|
def process_did_list(did_list):
|
|
|
"""Turn a list of deferred_ids into a final result or failure."""
|
|
|
new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
|
|
|
final_d = gatherBoth(new_d_list,
|
|
|
fireOnOneErrback=0,
|
|
|
consumeErrors=1,
|
|
|
logErrors=0)
|
|
|
final_d.addCallback(error.collect_exceptions, 'gather')
|
|
|
final_d.addCallback(lambda lop: [i[0] for i in lop])
|
|
|
final_d.addCallback(mapObject.joinPartitions)
|
|
|
return final_d
|
|
|
# Now, depending on block, we need to handle the list deferred_ids
|
|
|
# coming down the pipe diferently.
|
|
|
if block:
|
|
|
# If we are blocking register a callback that will transform the
|
|
|
# list of deferred_ids into the final result.
|
|
|
d.addCallback(process_did_list)
|
|
|
return d
|
|
|
else:
|
|
|
# Here we are going to use a _local_ PendingDeferredManager.
|
|
|
deferred_id = self.pdm.get_deferred_id()
|
|
|
# This is the deferred we will return to the user that will fire
|
|
|
# with the local deferred_id AFTER we have received the list of
|
|
|
# primary deferred_ids
|
|
|
d_to_return = defer.Deferred()
|
|
|
def do_it(did_list):
|
|
|
"""Produce a deferred to the final result, but first fire the
|
|
|
deferred we will return to the user that has the local
|
|
|
deferred id."""
|
|
|
d_to_return.callback(deferred_id)
|
|
|
return process_did_list(did_list)
|
|
|
d.addCallback(do_it)
|
|
|
# Now save the deferred to the final result
|
|
|
self.pdm.save_pending_deferred(d, deferred_id)
|
|
|
return d_to_return
|
|
|
|
|
|
d = self._process_targets(targets)
|
|
|
d.addCallback(do_gather)
|
|
|
return d
|
|
|
|
|
|
def raw_map(self, func, sequences, dist='b', targets='all', block=True):
|
|
|
"""
|
|
|
A parallelized version of Python's builtin map.
|
|
|
|
|
|
This has a slightly different syntax than the builtin `map`.
|
|
|
This is needed because we need to have keyword arguments and thus
|
|
|
can't use *args to capture all the sequences. Instead, they must
|
|
|
be passed in a list or tuple.
|
|
|
|
|
|
raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
|
|
|
|
|
|
Most users will want to use parallel functions or the `mapper`
|
|
|
and `map` methods for an API that follows that of the builtin
|
|
|
`map`.
|
|
|
"""
|
|
|
if not isinstance(sequences, (list, tuple)):
|
|
|
raise TypeError('sequences must be a list or tuple')
|
|
|
max_len = max(len(s) for s in sequences)
|
|
|
for s in sequences:
|
|
|
if len(s)!=max_len:
|
|
|
raise ValueError('all sequences must have equal length')
|
|
|
if isinstance(func, FunctionType):
|
|
|
d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
|
|
|
d.addCallback(lambda did: self.get_pending_deferred(did, True))
|
|
|
sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
|
|
|
elif isinstance(func, str):
|
|
|
d = defer.succeed(None)
|
|
|
sourceToRun = \
|
|
|
'_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
|
|
|
else:
|
|
|
raise TypeError("func must be a function or str")
|
|
|
|
|
|
d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
|
|
|
d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
|
|
|
d.addCallback(lambda did: self.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
|
|
|
return d
|
|
|
|
|
|
def map(self, func, *sequences):
|
|
|
"""
|
|
|
A parallel version of Python's builtin `map` function.
|
|
|
|
|
|
This method applies a function to sequences of arguments. It
|
|
|
follows the same syntax as the builtin `map`.
|
|
|
|
|
|
This method creates a mapper objects by calling `self.mapper` with
|
|
|
no arguments and then uses that mapper to do the mapping. See
|
|
|
the documentation of `mapper` for more details.
|
|
|
"""
|
|
|
return self.mapper().map(func, *sequences)
|
|
|
|
|
|
def mapper(self, dist='b', targets='all', block=True):
|
|
|
"""
|
|
|
Create a mapper object that has a `map` method.
|
|
|
|
|
|
This method returns an object that implements the `IMapper`
|
|
|
interface. This method is a factory that is used to control how
|
|
|
the map happens.
|
|
|
|
|
|
:Parameters:
|
|
|
dist : str
|
|
|
What decomposition to use, 'b' is the only one supported
|
|
|
currently
|
|
|
targets : str, int, sequence of ints
|
|
|
Which engines to use for the map
|
|
|
block : boolean
|
|
|
Should calls to `map` block or not
|
|
|
"""
|
|
|
return MultiEngineMapper(self, dist, targets, block)
|
|
|
|
|
|
def parallel(self, dist='b', targets='all', block=True):
|
|
|
"""
|
|
|
A decorator that turns a function into a parallel function.
|
|
|
|
|
|
This can be used as:
|
|
|
|
|
|
@parallel()
|
|
|
def f(x, y)
|
|
|
...
|
|
|
|
|
|
f(range(10), range(10))
|
|
|
|
|
|
This causes f(0,0), f(1,1), ... to be called in parallel.
|
|
|
|
|
|
:Parameters:
|
|
|
dist : str
|
|
|
What decomposition to use, 'b' is the only one supported
|
|
|
currently
|
|
|
targets : str, int, sequence of ints
|
|
|
Which engines to use for the map
|
|
|
block : boolean
|
|
|
Should calls to `map` block or not
|
|
|
"""
|
|
|
mapper = self.mapper(dist, targets, block)
|
|
|
pf = ParallelFunction(mapper)
|
|
|
return pf
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# ISynchronousMultiEngineExtras related methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def _transformPullResult(self, pushResult, multitargets, lenKeys):
|
|
|
if not multitargets:
|
|
|
result = pushResult[0]
|
|
|
elif lenKeys > 1:
|
|
|
result = zip(*pushResult)
|
|
|
elif lenKeys is 1:
|
|
|
result = list(pushResult)
|
|
|
return result
|
|
|
|
|
|
def zip_pull(self, keys, targets='all', block=True):
|
|
|
multitargets = not isinstance(targets, int) and len(targets) > 1
|
|
|
lenKeys = len(keys)
|
|
|
d = self.pull(keys, targets=targets, block=block)
|
|
|
if block:
|
|
|
d.addCallback(self._transformPullResult, multitargets, lenKeys)
|
|
|
else:
|
|
|
d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
|
|
|
return d
|
|
|
|
|
|
def run(self, fname, targets='all', block=True):
|
|
|
fileobj = open(fname,'r')
|
|
|
source = fileobj.read()
|
|
|
fileobj.close()
|
|
|
# if the compilation blows, we get a local error right away
|
|
|
try:
|
|
|
code = compile(source,fname,'exec')
|
|
|
except:
|
|
|
return defer.fail(failure.Failure())
|
|
|
# Now run the code
|
|
|
d = self.execute(source, targets=targets, block=block)
|
|
|
return d
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# IBlockingClientAdaptor related methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def adapt_to_blocking_client(self):
|
|
|
from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
|
|
|
return IFullBlockingMultiEngineClient(self)
|
|
|
|