pendingdeferred.py
178 lines
| 6.8 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | # encoding: utf-8 | ||
# -*- test-case-name: IPython.kernel.test.test_pendingdeferred -*- | ||||
"""Classes to manage pending Deferreds. | ||||
A pending deferred is a deferred that may or may not have fired. This module | ||||
is useful for taking a class whose methods return deferreds and wrapping it to | ||||
provide API that keeps track of those deferreds for later retrieval. See the | ||||
tests for examples of its usage. | ||||
""" | ||||
__docformat__ = "restructuredtext en" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
from twisted.application import service | ||||
from twisted.internet import defer, reactor | ||||
from twisted.python import log, components, failure | ||||
from zope.interface import Interface, implements, Attribute | ||||
from IPython.kernel.twistedutil import gatherBoth | ||||
from IPython.kernel import error | ||||
from IPython.external import guid | ||||
from IPython.tools import growl | ||||
class PendingDeferredManager(object): | ||||
"""A class to track pending deferreds. | ||||
To track a pending deferred, the user of this class must first | ||||
get a deferredID by calling `get_next_deferred_id`. Then the user | ||||
calls `save_pending_deferred` passing that id and the deferred to | ||||
be tracked. To later retrieve it, the user calls | ||||
`get_pending_deferred` passing the id. | ||||
""" | ||||
def __init__(self): | ||||
"""Manage pending deferreds.""" | ||||
self.results = {} # Populated when results are ready | ||||
self.deferred_ids = [] # List of deferred ids I am managing | ||||
self.deferreds_to_callback = {} # dict of lists of deferreds to callback | ||||
def get_deferred_id(self): | ||||
return guid.generate() | ||||
def quick_has_id(self, deferred_id): | ||||
return deferred_id in self.deferred_ids | ||||
def _save_result(self, result, deferred_id): | ||||
if self.quick_has_id(deferred_id): | ||||
self.results[deferred_id] = result | ||||
self._trigger_callbacks(deferred_id) | ||||
def _trigger_callbacks(self, deferred_id): | ||||
# Go through and call the waiting callbacks | ||||
result = self.results.get(deferred_id) | ||||
if result is not None: # Only trigger if there is a result | ||||
try: | ||||
d = self.deferreds_to_callback.pop(deferred_id) | ||||
except KeyError: | ||||
d = None | ||||
if d is not None: | ||||
if isinstance(result, failure.Failure): | ||||
d.errback(result) | ||||
else: | ||||
d.callback(result) | ||||
self.delete_pending_deferred(deferred_id) | ||||
def save_pending_deferred(self, d, deferred_id=None): | ||||
"""Save the result of a deferred for later retrieval. | ||||
This works even if the deferred has not fired. | ||||
Only callbacks and errbacks applied to d before this method | ||||
is called will be called no the final result. | ||||
""" | ||||
if deferred_id is None: | ||||
deferred_id = self.get_deferred_id() | ||||
self.deferred_ids.append(deferred_id) | ||||
d.addBoth(self._save_result, deferred_id) | ||||
return deferred_id | ||||
def _protected_del(self, key, container): | ||||
try: | ||||
del container[key] | ||||
except Exception: | ||||
pass | ||||
def delete_pending_deferred(self, deferred_id): | ||||
"""Remove a deferred I am tracking and add a null Errback. | ||||
:Parameters: | ||||
deferredID : str | ||||
The id of a deferred that I am tracking. | ||||
""" | ||||
if self.quick_has_id(deferred_id): | ||||
# First go through a errback any deferreds that are still waiting | ||||
d = self.deferreds_to_callback.get(deferred_id) | ||||
if d is not None: | ||||
d.errback(failure.Failure(error.AbortedPendingDeferredError("pending deferred has been deleted: %r"%deferred_id))) | ||||
# Now delete all references to this deferred_id | ||||
ind = self.deferred_ids.index(deferred_id) | ||||
self._protected_del(ind, self.deferred_ids) | ||||
self._protected_del(deferred_id, self.deferreds_to_callback) | ||||
self._protected_del(deferred_id, self.results) | ||||
else: | ||||
raise error.InvalidDeferredID('invalid deferred_id: %r' % deferred_id) | ||||
def clear_pending_deferreds(self): | ||||
"""Remove all the deferreds I am tracking.""" | ||||
for did in self.deferred_ids: | ||||
self.delete_pending_deferred(did) | ||||
def _delete_and_pass_through(self, r, deferred_id): | ||||
self.delete_pending_deferred(deferred_id) | ||||
return r | ||||
def get_pending_deferred(self, deferred_id, block): | ||||
if not self.quick_has_id(deferred_id) or self.deferreds_to_callback.get(deferred_id) is not None: | ||||
return defer.fail(failure.Failure(error.InvalidDeferredID('invalid deferred_id: %r' + deferred_id))) | ||||
result = self.results.get(deferred_id) | ||||
if result is not None: | ||||
self.delete_pending_deferred(deferred_id) | ||||
if isinstance(result, failure.Failure): | ||||
return defer.fail(result) | ||||
else: | ||||
return defer.succeed(result) | ||||
else: # Result is not ready | ||||
if block: | ||||
d = defer.Deferred() | ||||
self.deferreds_to_callback[deferred_id] = d | ||||
return d | ||||
else: | ||||
return defer.fail(failure.Failure(error.ResultNotCompleted("result not completed: %r" % deferred_id))) | ||||
def two_phase(wrapped_method): | ||||
"""Wrap methods that return a deferred into a two phase process. | ||||
This transforms:: | ||||
foo(arg1, arg2, ...) -> foo(arg1, arg2,...,block=True). | ||||
The wrapped method will then return a deferred to a deferred id. This will | ||||
only work on method of classes that inherit from `PendingDeferredManager`, | ||||
as that class provides an API for | ||||
block is a boolean to determine if we should use the two phase process or | ||||
just simply call the wrapped method. At this point block does not have a | ||||
default and it probably won't. | ||||
""" | ||||
def wrapper_two_phase(pdm, *args, **kwargs): | ||||
try: | ||||
block = kwargs.pop('block') | ||||
except KeyError: | ||||
block = True # The default if not specified | ||||
if block: | ||||
return wrapped_method(pdm, *args, **kwargs) | ||||
else: | ||||
d = wrapped_method(pdm, *args, **kwargs) | ||||
deferred_id=pdm.save_pending_deferred(d) | ||||
return defer.succeed(deferred_id) | ||||
return wrapper_two_phase | ||||