##// END OF EJS Templates
Minor robustness/speed improvements to process handling
Minor robustness/speed improvements to process handling

File last commit:

r2526:ec6b47e5
r3075:78f7387a
Show More
multienginefc.py
760 lines | 30.2 KiB | text/x-python | PythonLexer
# encoding: utf-8
"""
Expose the multiengine controller over the Foolscap network protocol.
"""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import cPickle as pickle
from types import FunctionType
from zope.interface import Interface, implements
from twisted.internet import defer
from twisted.python import components, failure
try:
from foolscap.api import Referenceable
except ImportError:
from foolscap import Referenceable
from IPython.kernel import error
from IPython.kernel import map as Map
from IPython.kernel.parallelfunction import ParallelFunction
from IPython.kernel.mapper import (
MultiEngineMapper,
IMultiEngineMapperFactory,
IMapper
)
from IPython.kernel.twistedutil import gatherBoth
from IPython.kernel.multiengine import (
IMultiEngine,
IFullSynchronousMultiEngine,
ISynchronousMultiEngine)
from IPython.kernel.pendingdeferred import PendingDeferredManager
from IPython.kernel.pickleutil import (
canDict,
canSequence, uncanDict, uncanSequence
)
from IPython.kernel.clientinterfaces import (
IFCClientInterfaceProvider,
IBlockingClientAdaptor
)
# Needed to access the true globals from __main__.__dict__
import __main__
#-------------------------------------------------------------------------------
# The Controller side of things
#-------------------------------------------------------------------------------
def packageResult(wrappedMethod):
def wrappedPackageResult(self, *args, **kwargs):
d = wrappedMethod(self, *args, **kwargs)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
return wrappedPackageResult
class IFCSynchronousMultiEngine(Interface):
"""Foolscap interface to `ISynchronousMultiEngine`.
The methods in this interface are similar to those of
`ISynchronousMultiEngine`, but their arguments and return values are pickled
if they are not already simple Python types that can be send over XML-RPC.
See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
documentation about the methods.
Most methods in this interface act like the `ISynchronousMultiEngine`
versions and can be called in blocking or non-blocking mode.
"""
pass
class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
"""Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
"""
implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
addSlash = True
def __init__(self, multiengine):
# Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
# it. This allow this class to do two adaptation steps.
self.smultiengine = ISynchronousMultiEngine(multiengine)
self._deferredIDCallbacks = {}
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def packageFailure(self, f):
f.cleanFailure()
return self.packageSuccess(f)
def packageSuccess(self, obj):
serial = pickle.dumps(obj, 2)
return serial
#---------------------------------------------------------------------------
# Things related to PendingDeferredManager
#---------------------------------------------------------------------------
@packageResult
def remote_get_pending_deferred(self, deferredID, block):
d = self.smultiengine.get_pending_deferred(deferredID, block)
try:
callback = self._deferredIDCallbacks.pop(deferredID)
except KeyError:
callback = None
if callback is not None:
d.addCallback(callback[0], *callback[1], **callback[2])
return d
@packageResult
def remote_clear_pending_deferreds(self):
return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
self._deferredIDCallbacks[did] = (callback, args, kwargs)
return did
#---------------------------------------------------------------------------
# IEngineMultiplexer related methods
#---------------------------------------------------------------------------
@packageResult
def remote_execute(self, lines, targets, block):
return self.smultiengine.execute(lines, targets=targets, block=block)
@packageResult
def remote_push(self, binaryNS, targets, block):
try:
namespace = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
d = self.smultiengine.push(namespace, targets=targets, block=block)
return d
@packageResult
def remote_pull(self, keys, targets, block):
d = self.smultiengine.pull(keys, targets=targets, block=block)
return d
@packageResult
def remote_push_function(self, binaryNS, targets, block):
try:
namespace = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
namespace = uncanDict(namespace)
d = self.smultiengine.push_function(namespace, targets=targets, block=block)
return d
def _canMultipleKeys(self, result):
return [canSequence(r) for r in result]
@packageResult
def remote_pull_function(self, keys, targets, block):
def can_functions(r, keys):
if len(keys)==1 or isinstance(keys, str):
result = canSequence(r)
elif len(keys)>1:
result = [canSequence(s) for s in r]
return result
d = self.smultiengine.pull_function(keys, targets=targets, block=block)
if block:
d.addCallback(can_functions, keys)
else:
d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
return d
@packageResult
def remote_push_serialized(self, binaryNS, targets, block):
try:
namespace = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
return d
@packageResult
def remote_pull_serialized(self, keys, targets, block):
d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
return d
@packageResult
def remote_get_result(self, i, targets, block):
if i == 'None':
i = None
return self.smultiengine.get_result(i, targets=targets, block=block)
@packageResult
def remote_reset(self, targets, block):
return self.smultiengine.reset(targets=targets, block=block)
@packageResult
def remote_keys(self, targets, block):
return self.smultiengine.keys(targets=targets, block=block)
@packageResult
def remote_kill(self, controller, targets, block):
return self.smultiengine.kill(controller, targets=targets, block=block)
@packageResult
def remote_clear_queue(self, targets, block):
return self.smultiengine.clear_queue(targets=targets, block=block)
@packageResult
def remote_queue_status(self, targets, block):
return self.smultiengine.queue_status(targets=targets, block=block)
@packageResult
def remote_set_properties(self, binaryNS, targets, block):
try:
ns = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
d = self.smultiengine.set_properties(ns, targets=targets, block=block)
return d
@packageResult
def remote_get_properties(self, keys, targets, block):
if keys=='None':
keys=None
return self.smultiengine.get_properties(keys, targets=targets, block=block)
@packageResult
def remote_has_properties(self, keys, targets, block):
return self.smultiengine.has_properties(keys, targets=targets, block=block)
@packageResult
def remote_del_properties(self, keys, targets, block):
return self.smultiengine.del_properties(keys, targets=targets, block=block)
@packageResult
def remote_clear_properties(self, targets, block):
return self.smultiengine.clear_properties(targets=targets, block=block)
#---------------------------------------------------------------------------
# IMultiEngine related methods
#---------------------------------------------------------------------------
def remote_get_ids(self):
"""Get the ids of the registered engines.
This method always blocks.
"""
return self.smultiengine.get_ids()
#---------------------------------------------------------------------------
# IFCClientInterfaceProvider related methods
#---------------------------------------------------------------------------
def remote_get_client_name(self):
return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
# The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
# `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
# two phase adaptation.
components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
IMultiEngine, IFCSynchronousMultiEngine)
#-------------------------------------------------------------------------------
# The Client side of things
#-------------------------------------------------------------------------------
class FCFullSynchronousMultiEngineClient(object):
implements(
IFullSynchronousMultiEngine,
IBlockingClientAdaptor,
IMultiEngineMapperFactory,
IMapper
)
def __init__(self, remote_reference):
self.remote_reference = remote_reference
self._deferredIDCallbacks = {}
# This class manages some pending deferreds through this instance. This
# is required for methods like gather/scatter as it enables us to
# create our own pending deferreds for composite operations.
self.pdm = PendingDeferredManager()
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def unpackage(self, r):
return pickle.loads(r)
#---------------------------------------------------------------------------
# Things related to PendingDeferredManager
#---------------------------------------------------------------------------
def get_pending_deferred(self, deferredID, block=True):
# Because we are managing some pending deferreds locally (through
# self.pdm) and some remotely (on the controller), we first try the
# local one and then the remote one.
if self.pdm.quick_has_id(deferredID):
d = self.pdm.get_pending_deferred(deferredID, block)
return d
else:
d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
d.addCallback(self.unpackage)
try:
callback = self._deferredIDCallbacks.pop(deferredID)
except KeyError:
callback = None
if callback is not None:
d.addCallback(callback[0], *callback[1], **callback[2])
return d
def clear_pending_deferreds(self):
# This clear both the local (self.pdm) and remote pending deferreds
self.pdm.clear_pending_deferreds()
d2 = self.remote_reference.callRemote('clear_pending_deferreds')
d2.addCallback(self.unpackage)
return d2
def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
self._deferredIDCallbacks[did] = (callback, args, kwargs)
return did
#---------------------------------------------------------------------------
# IEngineMultiplexer related methods
#---------------------------------------------------------------------------
def execute(self, lines, targets='all', block=True):
d = self.remote_reference.callRemote('execute', lines, targets, block)
d.addCallback(self.unpackage)
return d
def push(self, namespace, targets='all', block=True):
serial = pickle.dumps(namespace, 2)
d = self.remote_reference.callRemote('push', serial, targets, block)
d.addCallback(self.unpackage)
return d
def pull(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('pull', keys, targets, block)
d.addCallback(self.unpackage)
return d
def push_function(self, namespace, targets='all', block=True):
cannedNamespace = canDict(namespace)
serial = pickle.dumps(cannedNamespace, 2)
d = self.remote_reference.callRemote('push_function', serial, targets, block)
d.addCallback(self.unpackage)
return d
def pull_function(self, keys, targets='all', block=True):
def uncan_functions(r, keys):
if len(keys)==1 or isinstance(keys, str):
return uncanSequence(r)
elif len(keys)>1:
return [uncanSequence(s) for s in r]
d = self.remote_reference.callRemote('pull_function', keys, targets, block)
if block:
d.addCallback(self.unpackage)
d.addCallback(uncan_functions, keys)
else:
d.addCallback(self.unpackage)
d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
return d
def push_serialized(self, namespace, targets='all', block=True):
cannedNamespace = canDict(namespace)
serial = pickle.dumps(cannedNamespace, 2)
d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
d.addCallback(self.unpackage)
return d
def pull_serialized(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
d.addCallback(self.unpackage)
return d
def get_result(self, i=None, targets='all', block=True):
if i is None: # This is because None cannot be marshalled by xml-rpc
i = 'None'
d = self.remote_reference.callRemote('get_result', i, targets, block)
d.addCallback(self.unpackage)
return d
def reset(self, targets='all', block=True):
d = self.remote_reference.callRemote('reset', targets, block)
d.addCallback(self.unpackage)
return d
def keys(self, targets='all', block=True):
d = self.remote_reference.callRemote('keys', targets, block)
d.addCallback(self.unpackage)
return d
def kill(self, controller=False, targets='all', block=True):
d = self.remote_reference.callRemote('kill', controller, targets, block)
d.addCallback(self.unpackage)
return d
def clear_queue(self, targets='all', block=True):
d = self.remote_reference.callRemote('clear_queue', targets, block)
d.addCallback(self.unpackage)
return d
def queue_status(self, targets='all', block=True):
d = self.remote_reference.callRemote('queue_status', targets, block)
d.addCallback(self.unpackage)
return d
def set_properties(self, properties, targets='all', block=True):
serial = pickle.dumps(properties, 2)
d = self.remote_reference.callRemote('set_properties', serial, targets, block)
d.addCallback(self.unpackage)
return d
def get_properties(self, keys=None, targets='all', block=True):
if keys==None:
keys='None'
d = self.remote_reference.callRemote('get_properties', keys, targets, block)
d.addCallback(self.unpackage)
return d
def has_properties(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('has_properties', keys, targets, block)
d.addCallback(self.unpackage)
return d
def del_properties(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('del_properties', keys, targets, block)
d.addCallback(self.unpackage)
return d
def clear_properties(self, targets='all', block=True):
d = self.remote_reference.callRemote('clear_properties', targets, block)
d.addCallback(self.unpackage)
return d
#---------------------------------------------------------------------------
# IMultiEngine related methods
#---------------------------------------------------------------------------
def get_ids(self):
d = self.remote_reference.callRemote('get_ids')
return d
#---------------------------------------------------------------------------
# ISynchronousMultiEngineCoordinator related methods
#---------------------------------------------------------------------------
def _process_targets(self, targets):
def create_targets(ids):
if isinstance(targets, int):
engines = [targets]
elif targets=='all':
engines = ids
elif isinstance(targets, (list, tuple)):
engines = targets
for t in engines:
if not t in ids:
raise error.InvalidEngineID("engine with id %r does not exist"%t)
return engines
d = self.get_ids()
d.addCallback(create_targets)
return d
def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
# Note: scatter and gather handle pending deferreds locally through self.pdm.
# This enables us to collect a bunch fo deferred ids and make a secondary
# deferred id that corresponds to the entire group. This logic is extremely
# difficult to get right though.
def do_scatter(engines):
nEngines = len(engines)
mapClass = Map.dists[dist]
mapObject = mapClass()
d_list = []
# Loop through and push to each engine in non-blocking mode.
# This returns a set of deferreds to deferred_ids
for index, engineid in enumerate(engines):
partition = mapObject.getPartition(seq, index, nEngines)
if flatten and len(partition) == 1:
d = self.push({key: partition[0]}, targets=engineid, block=False)
else:
d = self.push({key: partition}, targets=engineid, block=False)
d_list.append(d)
# Collect the deferred to deferred_ids
d = gatherBoth(d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
# Now d has a list of deferred_ids or Failures coming
d.addCallback(error.collect_exceptions, 'scatter')
def process_did_list(did_list):
"""Turn a list of deferred_ids into a final result or failure."""
new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
final_d = gatherBoth(new_d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
final_d.addCallback(error.collect_exceptions, 'scatter')
final_d.addCallback(lambda lop: [i[0] for i in lop])
return final_d
# Now, depending on block, we need to handle the list deferred_ids
# coming down the pipe diferently.
if block:
# If we are blocking register a callback that will transform the
# list of deferred_ids into the final result.
d.addCallback(process_did_list)
return d
else:
# Here we are going to use a _local_ PendingDeferredManager.
deferred_id = self.pdm.get_deferred_id()
# This is the deferred we will return to the user that will fire
# with the local deferred_id AFTER we have received the list of
# primary deferred_ids
d_to_return = defer.Deferred()
def do_it(did_list):
"""Produce a deferred to the final result, but first fire the
deferred we will return to the user that has the local
deferred id."""
d_to_return.callback(deferred_id)
return process_did_list(did_list)
d.addCallback(do_it)
# Now save the deferred to the final result
self.pdm.save_pending_deferred(d, deferred_id)
return d_to_return
d = self._process_targets(targets)
d.addCallback(do_scatter)
return d
def gather(self, key, dist='b', targets='all', block=True):
# Note: scatter and gather handle pending deferreds locally through self.pdm.
# This enables us to collect a bunch fo deferred ids and make a secondary
# deferred id that corresponds to the entire group. This logic is extremely
# difficult to get right though.
def do_gather(engines):
nEngines = len(engines)
mapClass = Map.dists[dist]
mapObject = mapClass()
d_list = []
# Loop through and push to each engine in non-blocking mode.
# This returns a set of deferreds to deferred_ids
for index, engineid in enumerate(engines):
d = self.pull(key, targets=engineid, block=False)
d_list.append(d)
# Collect the deferred to deferred_ids
d = gatherBoth(d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
# Now d has a list of deferred_ids or Failures coming
d.addCallback(error.collect_exceptions, 'scatter')
def process_did_list(did_list):
"""Turn a list of deferred_ids into a final result or failure."""
new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
final_d = gatherBoth(new_d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
final_d.addCallback(error.collect_exceptions, 'gather')
final_d.addCallback(lambda lop: [i[0] for i in lop])
final_d.addCallback(mapObject.joinPartitions)
return final_d
# Now, depending on block, we need to handle the list deferred_ids
# coming down the pipe diferently.
if block:
# If we are blocking register a callback that will transform the
# list of deferred_ids into the final result.
d.addCallback(process_did_list)
return d
else:
# Here we are going to use a _local_ PendingDeferredManager.
deferred_id = self.pdm.get_deferred_id()
# This is the deferred we will return to the user that will fire
# with the local deferred_id AFTER we have received the list of
# primary deferred_ids
d_to_return = defer.Deferred()
def do_it(did_list):
"""Produce a deferred to the final result, but first fire the
deferred we will return to the user that has the local
deferred id."""
d_to_return.callback(deferred_id)
return process_did_list(did_list)
d.addCallback(do_it)
# Now save the deferred to the final result
self.pdm.save_pending_deferred(d, deferred_id)
return d_to_return
d = self._process_targets(targets)
d.addCallback(do_gather)
return d
def raw_map(self, func, sequences, dist='b', targets='all', block=True):
"""
A parallelized version of Python's builtin map.
This has a slightly different syntax than the builtin `map`.
This is needed because we need to have keyword arguments and thus
can't use *args to capture all the sequences. Instead, they must
be passed in a list or tuple.
raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
Most users will want to use parallel functions or the `mapper`
and `map` methods for an API that follows that of the builtin
`map`.
"""
if not isinstance(sequences, (list, tuple)):
raise TypeError('sequences must be a list or tuple')
max_len = max(len(s) for s in sequences)
for s in sequences:
if len(s)!=max_len:
raise ValueError('all sequences must have equal length')
if isinstance(func, FunctionType):
d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
d.addCallback(lambda did: self.get_pending_deferred(did, True))
sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
elif isinstance(func, str):
d = defer.succeed(None)
sourceToRun = \
'_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
else:
raise TypeError("func must be a function or str")
d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
d.addCallback(lambda did: self.get_pending_deferred(did, True))
d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
return d
def map(self, func, *sequences):
"""
A parallel version of Python's builtin `map` function.
This method applies a function to sequences of arguments. It
follows the same syntax as the builtin `map`.
This method creates a mapper objects by calling `self.mapper` with
no arguments and then uses that mapper to do the mapping. See
the documentation of `mapper` for more details.
"""
return self.mapper().map(func, *sequences)
def mapper(self, dist='b', targets='all', block=True):
"""
Create a mapper object that has a `map` method.
This method returns an object that implements the `IMapper`
interface. This method is a factory that is used to control how
the map happens.
:Parameters:
dist : str
What decomposition to use, 'b' is the only one supported
currently
targets : str, int, sequence of ints
Which engines to use for the map
block : boolean
Should calls to `map` block or not
"""
return MultiEngineMapper(self, dist, targets, block)
def parallel(self, dist='b', targets='all', block=True):
"""
A decorator that turns a function into a parallel function.
This can be used as:
@parallel()
def f(x, y)
...
f(range(10), range(10))
This causes f(0,0), f(1,1), ... to be called in parallel.
:Parameters:
dist : str
What decomposition to use, 'b' is the only one supported
currently
targets : str, int, sequence of ints
Which engines to use for the map
block : boolean
Should calls to `map` block or not
"""
mapper = self.mapper(dist, targets, block)
pf = ParallelFunction(mapper)
return pf
#---------------------------------------------------------------------------
# ISynchronousMultiEngineExtras related methods
#---------------------------------------------------------------------------
def _transformPullResult(self, pushResult, multitargets, lenKeys):
if not multitargets:
result = pushResult[0]
elif lenKeys > 1:
result = zip(*pushResult)
elif lenKeys is 1:
result = list(pushResult)
return result
def zip_pull(self, keys, targets='all', block=True):
multitargets = not isinstance(targets, int) and len(targets) > 1
lenKeys = len(keys)
d = self.pull(keys, targets=targets, block=block)
if block:
d.addCallback(self._transformPullResult, multitargets, lenKeys)
else:
d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
return d
def run(self, fname, targets='all', block=True):
fileobj = open(fname,'r')
source = fileobj.read()
fileobj.close()
# if the compilation blows, we get a local error right away
try:
code = compile(source,fname,'exec')
except:
return defer.fail(failure.Failure())
# Now run the code
d = self.execute(source, targets=targets, block=block)
return d
#---------------------------------------------------------------------------
# IBlockingClientAdaptor related methods
#---------------------------------------------------------------------------
def adapt_to_blocking_client(self):
from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
return IFullBlockingMultiEngineClient(self)