##// END OF EJS Templates
Fixing logic in heartbeat monitor.
Fixing logic in heartbeat monitor.

File last commit:

r2498:3eae1372
r2925:c8055a68
Show More
pendingdeferred.py
174 lines | 6.6 KiB | text/x-python | PythonLexer
# encoding: utf-8
# -*- test-case-name: IPython.kernel.test.test_pendingdeferred -*-
"""Classes to manage pending Deferreds.
A pending deferred is a deferred that may or may not have fired. This module
is useful for taking a class whose methods return deferreds and wrapping it to
provide API that keeps track of those deferreds for later retrieval. See the
tests for examples of its usage.
"""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
from twisted.internet import defer
from twisted.python import failure
from IPython.kernel import error
from IPython.external import guid
class PendingDeferredManager(object):
"""A class to track pending deferreds.
To track a pending deferred, the user of this class must first
get a deferredID by calling `get_next_deferred_id`. Then the user
calls `save_pending_deferred` passing that id and the deferred to
be tracked. To later retrieve it, the user calls
`get_pending_deferred` passing the id.
"""
def __init__(self):
"""Manage pending deferreds."""
self.results = {} # Populated when results are ready
self.deferred_ids = [] # List of deferred ids I am managing
self.deferreds_to_callback = {} # dict of lists of deferreds to callback
def get_deferred_id(self):
return guid.generate()
def quick_has_id(self, deferred_id):
return deferred_id in self.deferred_ids
def _save_result(self, result, deferred_id):
if self.quick_has_id(deferred_id):
self.results[deferred_id] = result
self._trigger_callbacks(deferred_id)
def _trigger_callbacks(self, deferred_id):
# Go through and call the waiting callbacks
result = self.results.get(deferred_id)
if result is not None: # Only trigger if there is a result
try:
d = self.deferreds_to_callback.pop(deferred_id)
except KeyError:
d = None
if d is not None:
if isinstance(result, failure.Failure):
d.errback(result)
else:
d.callback(result)
self.delete_pending_deferred(deferred_id)
def save_pending_deferred(self, d, deferred_id=None):
"""Save the result of a deferred for later retrieval.
This works even if the deferred has not fired.
Only callbacks and errbacks applied to d before this method
is called will be called no the final result.
"""
if deferred_id is None:
deferred_id = self.get_deferred_id()
self.deferred_ids.append(deferred_id)
d.addBoth(self._save_result, deferred_id)
return deferred_id
def _protected_del(self, key, container):
try:
del container[key]
except Exception:
pass
def delete_pending_deferred(self, deferred_id):
"""Remove a deferred I am tracking and add a null Errback.
:Parameters:
deferredID : str
The id of a deferred that I am tracking.
"""
if self.quick_has_id(deferred_id):
# First go through a errback any deferreds that are still waiting
d = self.deferreds_to_callback.get(deferred_id)
if d is not None:
d.errback(failure.Failure(error.AbortedPendingDeferredError("pending deferred has been deleted: %r"%deferred_id)))
# Now delete all references to this deferred_id
ind = self.deferred_ids.index(deferred_id)
self._protected_del(ind, self.deferred_ids)
self._protected_del(deferred_id, self.deferreds_to_callback)
self._protected_del(deferred_id, self.results)
else:
raise error.InvalidDeferredID('invalid deferred_id: %r' % deferred_id)
def clear_pending_deferreds(self):
"""Remove all the deferreds I am tracking."""
for did in self.deferred_ids:
self.delete_pending_deferred(did)
def _delete_and_pass_through(self, r, deferred_id):
self.delete_pending_deferred(deferred_id)
return r
def get_pending_deferred(self, deferred_id, block):
if not self.quick_has_id(deferred_id) or self.deferreds_to_callback.get(deferred_id) is not None:
return defer.fail(failure.Failure(error.InvalidDeferredID('invalid deferred_id: %r' + deferred_id)))
result = self.results.get(deferred_id)
if result is not None:
self.delete_pending_deferred(deferred_id)
if isinstance(result, failure.Failure):
return defer.fail(result)
else:
return defer.succeed(result)
else: # Result is not ready
if block:
d = defer.Deferred()
self.deferreds_to_callback[deferred_id] = d
return d
else:
return defer.fail(failure.Failure(error.ResultNotCompleted("result not completed: %r" % deferred_id)))
def two_phase(wrapped_method):
"""Wrap methods that return a deferred into a two phase process.
This transforms::
foo(arg1, arg2, ...) -> foo(arg1, arg2,...,block=True).
The wrapped method will then return a deferred to a deferred id. This will
only work on method of classes that inherit from `PendingDeferredManager`,
as that class provides an API for
block is a boolean to determine if we should use the two phase process or
just simply call the wrapped method. At this point block does not have a
default and it probably won't.
"""
def wrapper_two_phase(pdm, *args, **kwargs):
try:
block = kwargs.pop('block')
except KeyError:
block = True # The default if not specified
if block:
return wrapped_method(pdm, *args, **kwargs)
else:
d = wrapped_method(pdm, *args, **kwargs)
deferred_id=pdm.save_pending_deferred(d)
return defer.succeed(deferred_id)
return wrapper_two_phase