# encoding: utf-8 """ Expose the multiengine controller over the Foolscap network protocol. """ __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- import cPickle as pickle from types import FunctionType from zope.interface import Interface, implements from twisted.internet import defer from twisted.python import components, failure try: from foolscap.api import Referenceable except ImportError: from foolscap import Referenceable from IPython.kernel import error from IPython.kernel import map as Map from IPython.kernel.parallelfunction import ParallelFunction from IPython.kernel.mapper import ( MultiEngineMapper, IMultiEngineMapperFactory, IMapper ) from IPython.kernel.twistedutil import gatherBoth from IPython.kernel.multiengine import ( IMultiEngine, IFullSynchronousMultiEngine, ISynchronousMultiEngine) from IPython.kernel.pendingdeferred import PendingDeferredManager from IPython.kernel.pickleutil import ( canDict, canSequence, uncanDict, uncanSequence ) from IPython.kernel.clientinterfaces import ( IFCClientInterfaceProvider, IBlockingClientAdaptor ) # Needed to access the true globals from __main__.__dict__ import __main__ #------------------------------------------------------------------------------- # The Controller side of things #------------------------------------------------------------------------------- def packageResult(wrappedMethod): def wrappedPackageResult(self, *args, **kwargs): d = wrappedMethod(self, *args, **kwargs) d.addCallback(self.packageSuccess) d.addErrback(self.packageFailure) return d return wrappedPackageResult class IFCSynchronousMultiEngine(Interface): """Foolscap interface to `ISynchronousMultiEngine`. The methods in this interface are similar to those of `ISynchronousMultiEngine`, but their arguments and return values are pickled if they are not already simple Python types that can be send over XML-RPC. See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for documentation about the methods. Most methods in this interface act like the `ISynchronousMultiEngine` versions and can be called in blocking or non-blocking mode. """ pass class FCSynchronousMultiEngineFromMultiEngine(Referenceable): """Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`. """ implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider) addSlash = True def __init__(self, multiengine): # Adapt the raw multiengine to `ISynchronousMultiEngine` before saving # it. This allow this class to do two adaptation steps. self.smultiengine = ISynchronousMultiEngine(multiengine) self._deferredIDCallbacks = {} #--------------------------------------------------------------------------- # Non interface methods #--------------------------------------------------------------------------- def packageFailure(self, f): f.cleanFailure() return self.packageSuccess(f) def packageSuccess(self, obj): serial = pickle.dumps(obj, 2) return serial #--------------------------------------------------------------------------- # Things related to PendingDeferredManager #--------------------------------------------------------------------------- @packageResult def remote_get_pending_deferred(self, deferredID, block): d = self.smultiengine.get_pending_deferred(deferredID, block) try: callback = self._deferredIDCallbacks.pop(deferredID) except KeyError: callback = None if callback is not None: d.addCallback(callback[0], *callback[1], **callback[2]) return d @packageResult def remote_clear_pending_deferreds(self): return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds) def _addDeferredIDCallback(self, did, callback, *args, **kwargs): self._deferredIDCallbacks[did] = (callback, args, kwargs) return did #--------------------------------------------------------------------------- # IEngineMultiplexer related methods #--------------------------------------------------------------------------- @packageResult def remote_execute(self, lines, targets, block): return self.smultiengine.execute(lines, targets=targets, block=block) @packageResult def remote_push(self, binaryNS, targets, block): try: namespace = pickle.loads(binaryNS) except: d = defer.fail(failure.Failure()) else: d = self.smultiengine.push(namespace, targets=targets, block=block) return d @packageResult def remote_pull(self, keys, targets, block): d = self.smultiengine.pull(keys, targets=targets, block=block) return d @packageResult def remote_push_function(self, binaryNS, targets, block): try: namespace = pickle.loads(binaryNS) except: d = defer.fail(failure.Failure()) else: namespace = uncanDict(namespace) d = self.smultiengine.push_function(namespace, targets=targets, block=block) return d def _canMultipleKeys(self, result): return [canSequence(r) for r in result] @packageResult def remote_pull_function(self, keys, targets, block): def can_functions(r, keys): if len(keys)==1 or isinstance(keys, str): result = canSequence(r) elif len(keys)>1: result = [canSequence(s) for s in r] return result d = self.smultiengine.pull_function(keys, targets=targets, block=block) if block: d.addCallback(can_functions, keys) else: d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys)) return d @packageResult def remote_push_serialized(self, binaryNS, targets, block): try: namespace = pickle.loads(binaryNS) except: d = defer.fail(failure.Failure()) else: d = self.smultiengine.push_serialized(namespace, targets=targets, block=block) return d @packageResult def remote_pull_serialized(self, keys, targets, block): d = self.smultiengine.pull_serialized(keys, targets=targets, block=block) return d @packageResult def remote_get_result(self, i, targets, block): if i == 'None': i = None return self.smultiengine.get_result(i, targets=targets, block=block) @packageResult def remote_reset(self, targets, block): return self.smultiengine.reset(targets=targets, block=block) @packageResult def remote_keys(self, targets, block): return self.smultiengine.keys(targets=targets, block=block) @packageResult def remote_kill(self, controller, targets, block): return self.smultiengine.kill(controller, targets=targets, block=block) @packageResult def remote_clear_queue(self, targets, block): return self.smultiengine.clear_queue(targets=targets, block=block) @packageResult def remote_queue_status(self, targets, block): return self.smultiengine.queue_status(targets=targets, block=block) @packageResult def remote_set_properties(self, binaryNS, targets, block): try: ns = pickle.loads(binaryNS) except: d = defer.fail(failure.Failure()) else: d = self.smultiengine.set_properties(ns, targets=targets, block=block) return d @packageResult def remote_get_properties(self, keys, targets, block): if keys=='None': keys=None return self.smultiengine.get_properties(keys, targets=targets, block=block) @packageResult def remote_has_properties(self, keys, targets, block): return self.smultiengine.has_properties(keys, targets=targets, block=block) @packageResult def remote_del_properties(self, keys, targets, block): return self.smultiengine.del_properties(keys, targets=targets, block=block) @packageResult def remote_clear_properties(self, targets, block): return self.smultiengine.clear_properties(targets=targets, block=block) #--------------------------------------------------------------------------- # IMultiEngine related methods #--------------------------------------------------------------------------- def remote_get_ids(self): """Get the ids of the registered engines. This method always blocks. """ return self.smultiengine.get_ids() #--------------------------------------------------------------------------- # IFCClientInterfaceProvider related methods #--------------------------------------------------------------------------- def remote_get_client_name(self): return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient' # The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the # `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a # two phase adaptation. components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine, IMultiEngine, IFCSynchronousMultiEngine) #------------------------------------------------------------------------------- # The Client side of things #------------------------------------------------------------------------------- class FCFullSynchronousMultiEngineClient(object): implements( IFullSynchronousMultiEngine, IBlockingClientAdaptor, IMultiEngineMapperFactory, IMapper ) def __init__(self, remote_reference): self.remote_reference = remote_reference self._deferredIDCallbacks = {} # This class manages some pending deferreds through this instance. This # is required for methods like gather/scatter as it enables us to # create our own pending deferreds for composite operations. self.pdm = PendingDeferredManager() #--------------------------------------------------------------------------- # Non interface methods #--------------------------------------------------------------------------- def unpackage(self, r): return pickle.loads(r) #--------------------------------------------------------------------------- # Things related to PendingDeferredManager #--------------------------------------------------------------------------- def get_pending_deferred(self, deferredID, block=True): # Because we are managing some pending deferreds locally (through # self.pdm) and some remotely (on the controller), we first try the # local one and then the remote one. if self.pdm.quick_has_id(deferredID): d = self.pdm.get_pending_deferred(deferredID, block) return d else: d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block) d.addCallback(self.unpackage) try: callback = self._deferredIDCallbacks.pop(deferredID) except KeyError: callback = None if callback is not None: d.addCallback(callback[0], *callback[1], **callback[2]) return d def clear_pending_deferreds(self): # This clear both the local (self.pdm) and remote pending deferreds self.pdm.clear_pending_deferreds() d2 = self.remote_reference.callRemote('clear_pending_deferreds') d2.addCallback(self.unpackage) return d2 def _addDeferredIDCallback(self, did, callback, *args, **kwargs): self._deferredIDCallbacks[did] = (callback, args, kwargs) return did #--------------------------------------------------------------------------- # IEngineMultiplexer related methods #--------------------------------------------------------------------------- def execute(self, lines, targets='all', block=True): d = self.remote_reference.callRemote('execute', lines, targets, block) d.addCallback(self.unpackage) return d def push(self, namespace, targets='all', block=True): serial = pickle.dumps(namespace, 2) d = self.remote_reference.callRemote('push', serial, targets, block) d.addCallback(self.unpackage) return d def pull(self, keys, targets='all', block=True): d = self.remote_reference.callRemote('pull', keys, targets, block) d.addCallback(self.unpackage) return d def push_function(self, namespace, targets='all', block=True): cannedNamespace = canDict(namespace) serial = pickle.dumps(cannedNamespace, 2) d = self.remote_reference.callRemote('push_function', serial, targets, block) d.addCallback(self.unpackage) return d def pull_function(self, keys, targets='all', block=True): def uncan_functions(r, keys): if len(keys)==1 or isinstance(keys, str): return uncanSequence(r) elif len(keys)>1: return [uncanSequence(s) for s in r] d = self.remote_reference.callRemote('pull_function', keys, targets, block) if block: d.addCallback(self.unpackage) d.addCallback(uncan_functions, keys) else: d.addCallback(self.unpackage) d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys)) return d def push_serialized(self, namespace, targets='all', block=True): cannedNamespace = canDict(namespace) serial = pickle.dumps(cannedNamespace, 2) d = self.remote_reference.callRemote('push_serialized', serial, targets, block) d.addCallback(self.unpackage) return d def pull_serialized(self, keys, targets='all', block=True): d = self.remote_reference.callRemote('pull_serialized', keys, targets, block) d.addCallback(self.unpackage) return d def get_result(self, i=None, targets='all', block=True): if i is None: # This is because None cannot be marshalled by xml-rpc i = 'None' d = self.remote_reference.callRemote('get_result', i, targets, block) d.addCallback(self.unpackage) return d def reset(self, targets='all', block=True): d = self.remote_reference.callRemote('reset', targets, block) d.addCallback(self.unpackage) return d def keys(self, targets='all', block=True): d = self.remote_reference.callRemote('keys', targets, block) d.addCallback(self.unpackage) return d def kill(self, controller=False, targets='all', block=True): d = self.remote_reference.callRemote('kill', controller, targets, block) d.addCallback(self.unpackage) return d def clear_queue(self, targets='all', block=True): d = self.remote_reference.callRemote('clear_queue', targets, block) d.addCallback(self.unpackage) return d def queue_status(self, targets='all', block=True): d = self.remote_reference.callRemote('queue_status', targets, block) d.addCallback(self.unpackage) return d def set_properties(self, properties, targets='all', block=True): serial = pickle.dumps(properties, 2) d = self.remote_reference.callRemote('set_properties', serial, targets, block) d.addCallback(self.unpackage) return d def get_properties(self, keys=None, targets='all', block=True): if keys==None: keys='None' d = self.remote_reference.callRemote('get_properties', keys, targets, block) d.addCallback(self.unpackage) return d def has_properties(self, keys, targets='all', block=True): d = self.remote_reference.callRemote('has_properties', keys, targets, block) d.addCallback(self.unpackage) return d def del_properties(self, keys, targets='all', block=True): d = self.remote_reference.callRemote('del_properties', keys, targets, block) d.addCallback(self.unpackage) return d def clear_properties(self, targets='all', block=True): d = self.remote_reference.callRemote('clear_properties', targets, block) d.addCallback(self.unpackage) return d #--------------------------------------------------------------------------- # IMultiEngine related methods #--------------------------------------------------------------------------- def get_ids(self): d = self.remote_reference.callRemote('get_ids') return d #--------------------------------------------------------------------------- # ISynchronousMultiEngineCoordinator related methods #--------------------------------------------------------------------------- def _process_targets(self, targets): def create_targets(ids): if isinstance(targets, int): engines = [targets] elif targets=='all': engines = ids elif isinstance(targets, (list, tuple)): engines = targets for t in engines: if not t in ids: raise error.InvalidEngineID("engine with id %r does not exist"%t) return engines d = self.get_ids() d.addCallback(create_targets) return d def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True): # Note: scatter and gather handle pending deferreds locally through self.pdm. # This enables us to collect a bunch fo deferred ids and make a secondary # deferred id that corresponds to the entire group. This logic is extremely # difficult to get right though. def do_scatter(engines): nEngines = len(engines) mapClass = Map.dists[dist] mapObject = mapClass() d_list = [] # Loop through and push to each engine in non-blocking mode. # This returns a set of deferreds to deferred_ids for index, engineid in enumerate(engines): partition = mapObject.getPartition(seq, index, nEngines) if flatten and len(partition) == 1: d = self.push({key: partition[0]}, targets=engineid, block=False) else: d = self.push({key: partition}, targets=engineid, block=False) d_list.append(d) # Collect the deferred to deferred_ids d = gatherBoth(d_list, fireOnOneErrback=0, consumeErrors=1, logErrors=0) # Now d has a list of deferred_ids or Failures coming d.addCallback(error.collect_exceptions, 'scatter') def process_did_list(did_list): """Turn a list of deferred_ids into a final result or failure.""" new_d_list = [self.get_pending_deferred(did, True) for did in did_list] final_d = gatherBoth(new_d_list, fireOnOneErrback=0, consumeErrors=1, logErrors=0) final_d.addCallback(error.collect_exceptions, 'scatter') final_d.addCallback(lambda lop: [i[0] for i in lop]) return final_d # Now, depending on block, we need to handle the list deferred_ids # coming down the pipe diferently. if block: # If we are blocking register a callback that will transform the # list of deferred_ids into the final result. d.addCallback(process_did_list) return d else: # Here we are going to use a _local_ PendingDeferredManager. deferred_id = self.pdm.get_deferred_id() # This is the deferred we will return to the user that will fire # with the local deferred_id AFTER we have received the list of # primary deferred_ids d_to_return = defer.Deferred() def do_it(did_list): """Produce a deferred to the final result, but first fire the deferred we will return to the user that has the local deferred id.""" d_to_return.callback(deferred_id) return process_did_list(did_list) d.addCallback(do_it) # Now save the deferred to the final result self.pdm.save_pending_deferred(d, deferred_id) return d_to_return d = self._process_targets(targets) d.addCallback(do_scatter) return d def gather(self, key, dist='b', targets='all', block=True): # Note: scatter and gather handle pending deferreds locally through self.pdm. # This enables us to collect a bunch fo deferred ids and make a secondary # deferred id that corresponds to the entire group. This logic is extremely # difficult to get right though. def do_gather(engines): nEngines = len(engines) mapClass = Map.dists[dist] mapObject = mapClass() d_list = [] # Loop through and push to each engine in non-blocking mode. # This returns a set of deferreds to deferred_ids for index, engineid in enumerate(engines): d = self.pull(key, targets=engineid, block=False) d_list.append(d) # Collect the deferred to deferred_ids d = gatherBoth(d_list, fireOnOneErrback=0, consumeErrors=1, logErrors=0) # Now d has a list of deferred_ids or Failures coming d.addCallback(error.collect_exceptions, 'scatter') def process_did_list(did_list): """Turn a list of deferred_ids into a final result or failure.""" new_d_list = [self.get_pending_deferred(did, True) for did in did_list] final_d = gatherBoth(new_d_list, fireOnOneErrback=0, consumeErrors=1, logErrors=0) final_d.addCallback(error.collect_exceptions, 'gather') final_d.addCallback(lambda lop: [i[0] for i in lop]) final_d.addCallback(mapObject.joinPartitions) return final_d # Now, depending on block, we need to handle the list deferred_ids # coming down the pipe diferently. if block: # If we are blocking register a callback that will transform the # list of deferred_ids into the final result. d.addCallback(process_did_list) return d else: # Here we are going to use a _local_ PendingDeferredManager. deferred_id = self.pdm.get_deferred_id() # This is the deferred we will return to the user that will fire # with the local deferred_id AFTER we have received the list of # primary deferred_ids d_to_return = defer.Deferred() def do_it(did_list): """Produce a deferred to the final result, but first fire the deferred we will return to the user that has the local deferred id.""" d_to_return.callback(deferred_id) return process_did_list(did_list) d.addCallback(do_it) # Now save the deferred to the final result self.pdm.save_pending_deferred(d, deferred_id) return d_to_return d = self._process_targets(targets) d.addCallback(do_gather) return d def raw_map(self, func, sequences, dist='b', targets='all', block=True): """ A parallelized version of Python's builtin map. This has a slightly different syntax than the builtin `map`. This is needed because we need to have keyword arguments and thus can't use *args to capture all the sequences. Instead, they must be passed in a list or tuple. raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) Most users will want to use parallel functions or the `mapper` and `map` methods for an API that follows that of the builtin `map`. """ if not isinstance(sequences, (list, tuple)): raise TypeError('sequences must be a list or tuple') max_len = max(len(s) for s in sequences) for s in sequences: if len(s)!=max_len: raise ValueError('all sequences must have equal length') if isinstance(func, FunctionType): d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False) d.addCallback(lambda did: self.get_pending_deferred(did, True)) sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))' elif isinstance(func, str): d = defer.succeed(None) sourceToRun = \ '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func else: raise TypeError("func must be a function or str") d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets)) d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False)) d.addCallback(lambda did: self.get_pending_deferred(did, True)) d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block)) return d def map(self, func, *sequences): """ A parallel version of Python's builtin `map` function. This method applies a function to sequences of arguments. It follows the same syntax as the builtin `map`. This method creates a mapper objects by calling `self.mapper` with no arguments and then uses that mapper to do the mapping. See the documentation of `mapper` for more details. """ return self.mapper().map(func, *sequences) def mapper(self, dist='b', targets='all', block=True): """ Create a mapper object that has a `map` method. This method returns an object that implements the `IMapper` interface. This method is a factory that is used to control how the map happens. :Parameters: dist : str What decomposition to use, 'b' is the only one supported currently targets : str, int, sequence of ints Which engines to use for the map block : boolean Should calls to `map` block or not """ return MultiEngineMapper(self, dist, targets, block) def parallel(self, dist='b', targets='all', block=True): """ A decorator that turns a function into a parallel function. This can be used as: @parallel() def f(x, y) ... f(range(10), range(10)) This causes f(0,0), f(1,1), ... to be called in parallel. :Parameters: dist : str What decomposition to use, 'b' is the only one supported currently targets : str, int, sequence of ints Which engines to use for the map block : boolean Should calls to `map` block or not """ mapper = self.mapper(dist, targets, block) pf = ParallelFunction(mapper) return pf #--------------------------------------------------------------------------- # ISynchronousMultiEngineExtras related methods #--------------------------------------------------------------------------- def _transformPullResult(self, pushResult, multitargets, lenKeys): if not multitargets: result = pushResult[0] elif lenKeys > 1: result = zip(*pushResult) elif lenKeys is 1: result = list(pushResult) return result def zip_pull(self, keys, targets='all', block=True): multitargets = not isinstance(targets, int) and len(targets) > 1 lenKeys = len(keys) d = self.pull(keys, targets=targets, block=block) if block: d.addCallback(self._transformPullResult, multitargets, lenKeys) else: d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys)) return d def run(self, fname, targets='all', block=True): fileobj = open(fname,'r') source = fileobj.read() fileobj.close() # if the compilation blows, we get a local error right away try: code = compile(source,fname,'exec') except: return defer.fail(failure.Failure()) # Now run the code d = self.execute(source, targets=targets, block=block) return d #--------------------------------------------------------------------------- # IBlockingClientAdaptor related methods #--------------------------------------------------------------------------- def adapt_to_blocking_client(self): from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient return IFullBlockingMultiEngineClient(self)