##// END OF EJS Templates
Merging from trunk.
Merging from trunk.

File last commit:

r1234:52b55407
r1268:bb43f0d6 merge
Show More
multienginefc.py
668 lines | 27.1 KiB | text/x-python | PythonLexer
# encoding: utf-8
"""
Expose the multiengine controller over the Foolscap network protocol.
"""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import cPickle as pickle
from types import FunctionType
from zope.interface import Interface, implements
from twisted.internet import defer
from twisted.python import components, failure, log
from foolscap import Referenceable
from IPython.kernel import error
from IPython.kernel.util import printer
from IPython.kernel import map as Map
from IPython.kernel.twistedutil import gatherBoth
from IPython.kernel.multiengine import (MultiEngine,
IMultiEngine,
IFullSynchronousMultiEngine,
ISynchronousMultiEngine)
from IPython.kernel.multiengineclient import wrapResultList
from IPython.kernel.pendingdeferred import PendingDeferredManager
from IPython.kernel.pickleutil import (can, canDict,
canSequence, uncan, uncanDict, uncanSequence)
from IPython.kernel.clientinterfaces import (
IFCClientInterfaceProvider,
IBlockingClientAdaptor
)
# Needed to access the true globals from __main__.__dict__
import __main__
#-------------------------------------------------------------------------------
# The Controller side of things
#-------------------------------------------------------------------------------
def packageResult(wrappedMethod):
def wrappedPackageResult(self, *args, **kwargs):
d = wrappedMethod(self, *args, **kwargs)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
return wrappedPackageResult
class IFCSynchronousMultiEngine(Interface):
"""Foolscap interface to `ISynchronousMultiEngine`.
The methods in this interface are similar to those of
`ISynchronousMultiEngine`, but their arguments and return values are pickled
if they are not already simple Python types that can be send over XML-RPC.
See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
documentation about the methods.
Most methods in this interface act like the `ISynchronousMultiEngine`
versions and can be called in blocking or non-blocking mode.
"""
pass
class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
"""Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
"""
implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
addSlash = True
def __init__(self, multiengine):
# Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
# it. This allow this class to do two adaptation steps.
self.smultiengine = ISynchronousMultiEngine(multiengine)
self._deferredIDCallbacks = {}
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def packageFailure(self, f):
f.cleanFailure()
return self.packageSuccess(f)
def packageSuccess(self, obj):
serial = pickle.dumps(obj, 2)
return serial
#---------------------------------------------------------------------------
# Things related to PendingDeferredManager
#---------------------------------------------------------------------------
@packageResult
def remote_get_pending_deferred(self, deferredID, block):
d = self.smultiengine.get_pending_deferred(deferredID, block)
try:
callback = self._deferredIDCallbacks.pop(deferredID)
except KeyError:
callback = None
if callback is not None:
d.addCallback(callback[0], *callback[1], **callback[2])
return d
@packageResult
def remote_clear_pending_deferreds(self):
return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
self._deferredIDCallbacks[did] = (callback, args, kwargs)
return did
#---------------------------------------------------------------------------
# IEngineMultiplexer related methods
#---------------------------------------------------------------------------
@packageResult
def remote_execute(self, lines, targets, block):
return self.smultiengine.execute(lines, targets=targets, block=block)
@packageResult
def remote_push(self, binaryNS, targets, block):
try:
namespace = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
d = self.smultiengine.push(namespace, targets=targets, block=block)
return d
@packageResult
def remote_pull(self, keys, targets, block):
d = self.smultiengine.pull(keys, targets=targets, block=block)
return d
@packageResult
def remote_push_function(self, binaryNS, targets, block):
try:
namespace = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
namespace = uncanDict(namespace)
d = self.smultiengine.push_function(namespace, targets=targets, block=block)
return d
def _canMultipleKeys(self, result):
return [canSequence(r) for r in result]
@packageResult
def remote_pull_function(self, keys, targets, block):
def can_functions(r, keys):
if len(keys)==1 or isinstance(keys, str):
result = canSequence(r)
elif len(keys)>1:
result = [canSequence(s) for s in r]
return result
d = self.smultiengine.pull_function(keys, targets=targets, block=block)
if block:
d.addCallback(can_functions, keys)
else:
d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
return d
@packageResult
def remote_push_serialized(self, binaryNS, targets, block):
try:
namespace = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
return d
@packageResult
def remote_pull_serialized(self, keys, targets, block):
d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
return d
@packageResult
def remote_get_result(self, i, targets, block):
if i == 'None':
i = None
return self.smultiengine.get_result(i, targets=targets, block=block)
@packageResult
def remote_reset(self, targets, block):
return self.smultiengine.reset(targets=targets, block=block)
@packageResult
def remote_keys(self, targets, block):
return self.smultiengine.keys(targets=targets, block=block)
@packageResult
def remote_kill(self, controller, targets, block):
return self.smultiengine.kill(controller, targets=targets, block=block)
@packageResult
def remote_clear_queue(self, targets, block):
return self.smultiengine.clear_queue(targets=targets, block=block)
@packageResult
def remote_queue_status(self, targets, block):
return self.smultiengine.queue_status(targets=targets, block=block)
@packageResult
def remote_set_properties(self, binaryNS, targets, block):
try:
ns = pickle.loads(binaryNS)
except:
d = defer.fail(failure.Failure())
else:
d = self.smultiengine.set_properties(ns, targets=targets, block=block)
return d
@packageResult
def remote_get_properties(self, keys, targets, block):
if keys=='None':
keys=None
return self.smultiengine.get_properties(keys, targets=targets, block=block)
@packageResult
def remote_has_properties(self, keys, targets, block):
return self.smultiengine.has_properties(keys, targets=targets, block=block)
@packageResult
def remote_del_properties(self, keys, targets, block):
return self.smultiengine.del_properties(keys, targets=targets, block=block)
@packageResult
def remote_clear_properties(self, targets, block):
return self.smultiengine.clear_properties(targets=targets, block=block)
#---------------------------------------------------------------------------
# IMultiEngine related methods
#---------------------------------------------------------------------------
def remote_get_ids(self):
"""Get the ids of the registered engines.
This method always blocks.
"""
return self.smultiengine.get_ids()
#---------------------------------------------------------------------------
# IFCClientInterfaceProvider related methods
#---------------------------------------------------------------------------
def remote_get_client_name(self):
return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
# The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
# `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
# two phase adaptation.
components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
IMultiEngine, IFCSynchronousMultiEngine)
#-------------------------------------------------------------------------------
# The Client side of things
#-------------------------------------------------------------------------------
class FCFullSynchronousMultiEngineClient(object):
implements(IFullSynchronousMultiEngine, IBlockingClientAdaptor)
def __init__(self, remote_reference):
self.remote_reference = remote_reference
self._deferredIDCallbacks = {}
# This class manages some pending deferreds through this instance. This
# is required for methods like gather/scatter as it enables us to
# create our own pending deferreds for composite operations.
self.pdm = PendingDeferredManager()
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def unpackage(self, r):
return pickle.loads(r)
#---------------------------------------------------------------------------
# Things related to PendingDeferredManager
#---------------------------------------------------------------------------
def get_pending_deferred(self, deferredID, block=True):
# Because we are managing some pending deferreds locally (through
# self.pdm) and some remotely (on the controller), we first try the
# local one and then the remote one.
if self.pdm.quick_has_id(deferredID):
d = self.pdm.get_pending_deferred(deferredID, block)
return d
else:
d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
d.addCallback(self.unpackage)
try:
callback = self._deferredIDCallbacks.pop(deferredID)
except KeyError:
callback = None
if callback is not None:
d.addCallback(callback[0], *callback[1], **callback[2])
return d
def clear_pending_deferreds(self):
# This clear both the local (self.pdm) and remote pending deferreds
self.pdm.clear_pending_deferreds()
d2 = self.remote_reference.callRemote('clear_pending_deferreds')
d2.addCallback(self.unpackage)
return d2
def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
self._deferredIDCallbacks[did] = (callback, args, kwargs)
return did
#---------------------------------------------------------------------------
# IEngineMultiplexer related methods
#---------------------------------------------------------------------------
def execute(self, lines, targets='all', block=True):
d = self.remote_reference.callRemote('execute', lines, targets, block)
d.addCallback(self.unpackage)
return d
def push(self, namespace, targets='all', block=True):
serial = pickle.dumps(namespace, 2)
d = self.remote_reference.callRemote('push', serial, targets, block)
d.addCallback(self.unpackage)
return d
def pull(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('pull', keys, targets, block)
d.addCallback(self.unpackage)
return d
def push_function(self, namespace, targets='all', block=True):
cannedNamespace = canDict(namespace)
serial = pickle.dumps(cannedNamespace, 2)
d = self.remote_reference.callRemote('push_function', serial, targets, block)
d.addCallback(self.unpackage)
return d
def pull_function(self, keys, targets='all', block=True):
def uncan_functions(r, keys):
if len(keys)==1 or isinstance(keys, str):
return uncanSequence(r)
elif len(keys)>1:
return [uncanSequence(s) for s in r]
d = self.remote_reference.callRemote('pull_function', keys, targets, block)
if block:
d.addCallback(self.unpackage)
d.addCallback(uncan_functions, keys)
else:
d.addCallback(self.unpackage)
d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
return d
def push_serialized(self, namespace, targets='all', block=True):
cannedNamespace = canDict(namespace)
serial = pickle.dumps(cannedNamespace, 2)
d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
d.addCallback(self.unpackage)
return d
def pull_serialized(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
d.addCallback(self.unpackage)
return d
def get_result(self, i=None, targets='all', block=True):
if i is None: # This is because None cannot be marshalled by xml-rpc
i = 'None'
d = self.remote_reference.callRemote('get_result', i, targets, block)
d.addCallback(self.unpackage)
return d
def reset(self, targets='all', block=True):
d = self.remote_reference.callRemote('reset', targets, block)
d.addCallback(self.unpackage)
return d
def keys(self, targets='all', block=True):
d = self.remote_reference.callRemote('keys', targets, block)
d.addCallback(self.unpackage)
return d
def kill(self, controller=False, targets='all', block=True):
d = self.remote_reference.callRemote('kill', controller, targets, block)
d.addCallback(self.unpackage)
return d
def clear_queue(self, targets='all', block=True):
d = self.remote_reference.callRemote('clear_queue', targets, block)
d.addCallback(self.unpackage)
return d
def queue_status(self, targets='all', block=True):
d = self.remote_reference.callRemote('queue_status', targets, block)
d.addCallback(self.unpackage)
return d
def set_properties(self, properties, targets='all', block=True):
serial = pickle.dumps(properties, 2)
d = self.remote_reference.callRemote('set_properties', serial, targets, block)
d.addCallback(self.unpackage)
return d
def get_properties(self, keys=None, targets='all', block=True):
if keys==None:
keys='None'
d = self.remote_reference.callRemote('get_properties', keys, targets, block)
d.addCallback(self.unpackage)
return d
def has_properties(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('has_properties', keys, targets, block)
d.addCallback(self.unpackage)
return d
def del_properties(self, keys, targets='all', block=True):
d = self.remote_reference.callRemote('del_properties', keys, targets, block)
d.addCallback(self.unpackage)
return d
def clear_properties(self, targets='all', block=True):
d = self.remote_reference.callRemote('clear_properties', targets, block)
d.addCallback(self.unpackage)
return d
#---------------------------------------------------------------------------
# IMultiEngine related methods
#---------------------------------------------------------------------------
def get_ids(self):
d = self.remote_reference.callRemote('get_ids')
return d
#---------------------------------------------------------------------------
# ISynchronousMultiEngineCoordinator related methods
#---------------------------------------------------------------------------
def _process_targets(self, targets):
def create_targets(ids):
if isinstance(targets, int):
engines = [targets]
elif targets=='all':
engines = ids
elif isinstance(targets, (list, tuple)):
engines = targets
for t in engines:
if not t in ids:
raise error.InvalidEngineID("engine with id %r does not exist"%t)
return engines
d = self.get_ids()
d.addCallback(create_targets)
return d
def scatter(self, key, seq, style='basic', flatten=False, targets='all', block=True):
# Note: scatter and gather handle pending deferreds locally through self.pdm.
# This enables us to collect a bunch fo deferred ids and make a secondary
# deferred id that corresponds to the entire group. This logic is extremely
# difficult to get right though.
def do_scatter(engines):
nEngines = len(engines)
mapClass = Map.styles[style]
mapObject = mapClass()
d_list = []
# Loop through and push to each engine in non-blocking mode.
# This returns a set of deferreds to deferred_ids
for index, engineid in enumerate(engines):
partition = mapObject.getPartition(seq, index, nEngines)
if flatten and len(partition) == 1:
d = self.push({key: partition[0]}, targets=engineid, block=False)
else:
d = self.push({key: partition}, targets=engineid, block=False)
d_list.append(d)
# Collect the deferred to deferred_ids
d = gatherBoth(d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
# Now d has a list of deferred_ids or Failures coming
d.addCallback(error.collect_exceptions, 'scatter')
def process_did_list(did_list):
"""Turn a list of deferred_ids into a final result or failure."""
new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
final_d = gatherBoth(new_d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
final_d.addCallback(error.collect_exceptions, 'scatter')
final_d.addCallback(lambda lop: [i[0] for i in lop])
return final_d
# Now, depending on block, we need to handle the list deferred_ids
# coming down the pipe diferently.
if block:
# If we are blocking register a callback that will transform the
# list of deferred_ids into the final result.
d.addCallback(process_did_list)
return d
else:
# Here we are going to use a _local_ PendingDeferredManager.
deferred_id = self.pdm.get_deferred_id()
# This is the deferred we will return to the user that will fire
# with the local deferred_id AFTER we have received the list of
# primary deferred_ids
d_to_return = defer.Deferred()
def do_it(did_list):
"""Produce a deferred to the final result, but first fire the
deferred we will return to the user that has the local
deferred id."""
d_to_return.callback(deferred_id)
return process_did_list(did_list)
d.addCallback(do_it)
# Now save the deferred to the final result
self.pdm.save_pending_deferred(d, deferred_id)
return d_to_return
d = self._process_targets(targets)
d.addCallback(do_scatter)
return d
def gather(self, key, style='basic', targets='all', block=True):
# Note: scatter and gather handle pending deferreds locally through self.pdm.
# This enables us to collect a bunch fo deferred ids and make a secondary
# deferred id that corresponds to the entire group. This logic is extremely
# difficult to get right though.
def do_gather(engines):
nEngines = len(engines)
mapClass = Map.styles[style]
mapObject = mapClass()
d_list = []
# Loop through and push to each engine in non-blocking mode.
# This returns a set of deferreds to deferred_ids
for index, engineid in enumerate(engines):
d = self.pull(key, targets=engineid, block=False)
d_list.append(d)
# Collect the deferred to deferred_ids
d = gatherBoth(d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
# Now d has a list of deferred_ids or Failures coming
d.addCallback(error.collect_exceptions, 'scatter')
def process_did_list(did_list):
"""Turn a list of deferred_ids into a final result or failure."""
new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
final_d = gatherBoth(new_d_list,
fireOnOneErrback=0,
consumeErrors=1,
logErrors=0)
final_d.addCallback(error.collect_exceptions, 'gather')
final_d.addCallback(lambda lop: [i[0] for i in lop])
final_d.addCallback(mapObject.joinPartitions)
return final_d
# Now, depending on block, we need to handle the list deferred_ids
# coming down the pipe diferently.
if block:
# If we are blocking register a callback that will transform the
# list of deferred_ids into the final result.
d.addCallback(process_did_list)
return d
else:
# Here we are going to use a _local_ PendingDeferredManager.
deferred_id = self.pdm.get_deferred_id()
# This is the deferred we will return to the user that will fire
# with the local deferred_id AFTER we have received the list of
# primary deferred_ids
d_to_return = defer.Deferred()
def do_it(did_list):
"""Produce a deferred to the final result, but first fire the
deferred we will return to the user that has the local
deferred id."""
d_to_return.callback(deferred_id)
return process_did_list(did_list)
d.addCallback(do_it)
# Now save the deferred to the final result
self.pdm.save_pending_deferred(d, deferred_id)
return d_to_return
d = self._process_targets(targets)
d.addCallback(do_gather)
return d
def map(self, func, seq, style='basic', targets='all', block=True):
d_list = []
if isinstance(func, FunctionType):
d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
d.addCallback(lambda did: self.get_pending_deferred(did, True))
sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)'
elif isinstance(func, str):
d = defer.succeed(None)
sourceToRun = \
'_ipython_map_seq_result = map(%s, _ipython_map_seq)' % func
else:
raise TypeError("func must be a function or str")
d.addCallback(lambda _: self.scatter('_ipython_map_seq', seq, style, targets=targets))
d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
d.addCallback(lambda did: self.get_pending_deferred(did, True))
d.addCallback(lambda _: self.gather('_ipython_map_seq_result', style, targets=targets, block=block))
return d
#---------------------------------------------------------------------------
# ISynchronousMultiEngineExtras related methods
#---------------------------------------------------------------------------
def _transformPullResult(self, pushResult, multitargets, lenKeys):
if not multitargets:
result = pushResult[0]
elif lenKeys > 1:
result = zip(*pushResult)
elif lenKeys is 1:
result = list(pushResult)
return result
def zip_pull(self, keys, targets='all', block=True):
multitargets = not isinstance(targets, int) and len(targets) > 1
lenKeys = len(keys)
d = self.pull(keys, targets=targets, block=block)
if block:
d.addCallback(self._transformPullResult, multitargets, lenKeys)
else:
d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
return d
def run(self, fname, targets='all', block=True):
fileobj = open(fname,'r')
source = fileobj.read()
fileobj.close()
# if the compilation blows, we get a local error right away
try:
code = compile(source,fname,'exec')
except:
return defer.fail(failure.Failure())
# Now run the code
d = self.execute(source, targets=targets, block=block)
return d
#---------------------------------------------------------------------------
# IBlockingClientAdaptor related methods
#---------------------------------------------------------------------------
def adapt_to_blocking_client(self):
from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
return IFullBlockingMultiEngineClient(self)