# encoding: utf-8 # -*- test-case-name: IPython.kernel.test.test_pendingdeferred -*- """Classes to manage pending Deferreds. A pending deferred is a deferred that may or may not have fired. This module is useful for taking a class whose methods return deferreds and wrapping it to provide API that keeps track of those deferreds for later retrieval. See the tests for examples of its usage. """ __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- from twisted.application import service from twisted.internet import defer, reactor from twisted.python import log, components, failure from zope.interface import Interface, implements, Attribute from IPython.kernel.twistedutil import gatherBoth from IPython.kernel import error from IPython.external import guid from IPython.utils import growl class PendingDeferredManager(object): """A class to track pending deferreds. To track a pending deferred, the user of this class must first get a deferredID by calling `get_next_deferred_id`. Then the user calls `save_pending_deferred` passing that id and the deferred to be tracked. To later retrieve it, the user calls `get_pending_deferred` passing the id. """ def __init__(self): """Manage pending deferreds.""" self.results = {} # Populated when results are ready self.deferred_ids = [] # List of deferred ids I am managing self.deferreds_to_callback = {} # dict of lists of deferreds to callback def get_deferred_id(self): return guid.generate() def quick_has_id(self, deferred_id): return deferred_id in self.deferred_ids def _save_result(self, result, deferred_id): if self.quick_has_id(deferred_id): self.results[deferred_id] = result self._trigger_callbacks(deferred_id) def _trigger_callbacks(self, deferred_id): # Go through and call the waiting callbacks result = self.results.get(deferred_id) if result is not None: # Only trigger if there is a result try: d = self.deferreds_to_callback.pop(deferred_id) except KeyError: d = None if d is not None: if isinstance(result, failure.Failure): d.errback(result) else: d.callback(result) self.delete_pending_deferred(deferred_id) def save_pending_deferred(self, d, deferred_id=None): """Save the result of a deferred for later retrieval. This works even if the deferred has not fired. Only callbacks and errbacks applied to d before this method is called will be called no the final result. """ if deferred_id is None: deferred_id = self.get_deferred_id() self.deferred_ids.append(deferred_id) d.addBoth(self._save_result, deferred_id) return deferred_id def _protected_del(self, key, container): try: del container[key] except Exception: pass def delete_pending_deferred(self, deferred_id): """Remove a deferred I am tracking and add a null Errback. :Parameters: deferredID : str The id of a deferred that I am tracking. """ if self.quick_has_id(deferred_id): # First go through a errback any deferreds that are still waiting d = self.deferreds_to_callback.get(deferred_id) if d is not None: d.errback(failure.Failure(error.AbortedPendingDeferredError("pending deferred has been deleted: %r"%deferred_id))) # Now delete all references to this deferred_id ind = self.deferred_ids.index(deferred_id) self._protected_del(ind, self.deferred_ids) self._protected_del(deferred_id, self.deferreds_to_callback) self._protected_del(deferred_id, self.results) else: raise error.InvalidDeferredID('invalid deferred_id: %r' % deferred_id) def clear_pending_deferreds(self): """Remove all the deferreds I am tracking.""" for did in self.deferred_ids: self.delete_pending_deferred(did) def _delete_and_pass_through(self, r, deferred_id): self.delete_pending_deferred(deferred_id) return r def get_pending_deferred(self, deferred_id, block): if not self.quick_has_id(deferred_id) or self.deferreds_to_callback.get(deferred_id) is not None: return defer.fail(failure.Failure(error.InvalidDeferredID('invalid deferred_id: %r' + deferred_id))) result = self.results.get(deferred_id) if result is not None: self.delete_pending_deferred(deferred_id) if isinstance(result, failure.Failure): return defer.fail(result) else: return defer.succeed(result) else: # Result is not ready if block: d = defer.Deferred() self.deferreds_to_callback[deferred_id] = d return d else: return defer.fail(failure.Failure(error.ResultNotCompleted("result not completed: %r" % deferred_id))) def two_phase(wrapped_method): """Wrap methods that return a deferred into a two phase process. This transforms:: foo(arg1, arg2, ...) -> foo(arg1, arg2,...,block=True). The wrapped method will then return a deferred to a deferred id. This will only work on method of classes that inherit from `PendingDeferredManager`, as that class provides an API for block is a boolean to determine if we should use the two phase process or just simply call the wrapped method. At this point block does not have a default and it probably won't. """ def wrapper_two_phase(pdm, *args, **kwargs): try: block = kwargs.pop('block') except KeyError: block = True # The default if not specified if block: return wrapped_method(pdm, *args, **kwargs) else: d = wrapped_method(pdm, *args, **kwargs) deferred_id=pdm.save_pending_deferred(d) return defer.succeed(deferred_id) return wrapper_two_phase