##// END OF EJS Templates
Adding information about IPYTHONDIR to usage of ipcluster and friends.
Adding information about IPYTHONDIR to usage of ipcluster and friends.

File last commit:

r1234:52b55407
r1959:7604468a
Show More
pendingdeferred.py
178 lines | 6.8 KiB | text/x-python | PythonLexer
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 # encoding: utf-8
# -*- test-case-name: IPython.kernel.test.test_pendingdeferred -*-
"""Classes to manage pending Deferreds.
A pending deferred is a deferred that may or may not have fired. This module
is useful for taking a class whose methods return deferreds and wrapping it to
provide API that keeps track of those deferreds for later retrieval. See the
tests for examples of its usage.
"""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
from twisted.application import service
from twisted.internet import defer, reactor
from twisted.python import log, components, failure
from zope.interface import Interface, implements, Attribute
from IPython.kernel.twistedutil import gatherBoth
from IPython.kernel import error
from IPython.external import guid
from IPython.tools import growl
class PendingDeferredManager(object):
"""A class to track pending deferreds.
To track a pending deferred, the user of this class must first
get a deferredID by calling `get_next_deferred_id`. Then the user
calls `save_pending_deferred` passing that id and the deferred to
be tracked. To later retrieve it, the user calls
`get_pending_deferred` passing the id.
"""
def __init__(self):
"""Manage pending deferreds."""
self.results = {} # Populated when results are ready
self.deferred_ids = [] # List of deferred ids I am managing
self.deferreds_to_callback = {} # dict of lists of deferreds to callback
def get_deferred_id(self):
return guid.generate()
def quick_has_id(self, deferred_id):
return deferred_id in self.deferred_ids
def _save_result(self, result, deferred_id):
if self.quick_has_id(deferred_id):
self.results[deferred_id] = result
self._trigger_callbacks(deferred_id)
def _trigger_callbacks(self, deferred_id):
# Go through and call the waiting callbacks
result = self.results.get(deferred_id)
if result is not None: # Only trigger if there is a result
try:
d = self.deferreds_to_callback.pop(deferred_id)
except KeyError:
d = None
if d is not None:
if isinstance(result, failure.Failure):
d.errback(result)
else:
d.callback(result)
self.delete_pending_deferred(deferred_id)
def save_pending_deferred(self, d, deferred_id=None):
"""Save the result of a deferred for later retrieval.
This works even if the deferred has not fired.
Only callbacks and errbacks applied to d before this method
is called will be called no the final result.
"""
if deferred_id is None:
deferred_id = self.get_deferred_id()
self.deferred_ids.append(deferred_id)
d.addBoth(self._save_result, deferred_id)
return deferred_id
def _protected_del(self, key, container):
try:
del container[key]
except Exception:
pass
def delete_pending_deferred(self, deferred_id):
"""Remove a deferred I am tracking and add a null Errback.
:Parameters:
deferredID : str
The id of a deferred that I am tracking.
"""
if self.quick_has_id(deferred_id):
# First go through a errback any deferreds that are still waiting
d = self.deferreds_to_callback.get(deferred_id)
if d is not None:
d.errback(failure.Failure(error.AbortedPendingDeferredError("pending deferred has been deleted: %r"%deferred_id)))
# Now delete all references to this deferred_id
ind = self.deferred_ids.index(deferred_id)
self._protected_del(ind, self.deferred_ids)
self._protected_del(deferred_id, self.deferreds_to_callback)
self._protected_del(deferred_id, self.results)
else:
raise error.InvalidDeferredID('invalid deferred_id: %r' % deferred_id)
def clear_pending_deferreds(self):
"""Remove all the deferreds I am tracking."""
for did in self.deferred_ids:
self.delete_pending_deferred(did)
def _delete_and_pass_through(self, r, deferred_id):
self.delete_pending_deferred(deferred_id)
return r
def get_pending_deferred(self, deferred_id, block):
if not self.quick_has_id(deferred_id) or self.deferreds_to_callback.get(deferred_id) is not None:
return defer.fail(failure.Failure(error.InvalidDeferredID('invalid deferred_id: %r' + deferred_id)))
result = self.results.get(deferred_id)
if result is not None:
self.delete_pending_deferred(deferred_id)
if isinstance(result, failure.Failure):
return defer.fail(result)
else:
return defer.succeed(result)
else: # Result is not ready
if block:
d = defer.Deferred()
self.deferreds_to_callback[deferred_id] = d
return d
else:
return defer.fail(failure.Failure(error.ResultNotCompleted("result not completed: %r" % deferred_id)))
def two_phase(wrapped_method):
"""Wrap methods that return a deferred into a two phase process.
This transforms::
foo(arg1, arg2, ...) -> foo(arg1, arg2,...,block=True).
The wrapped method will then return a deferred to a deferred id. This will
only work on method of classes that inherit from `PendingDeferredManager`,
as that class provides an API for
block is a boolean to determine if we should use the two phase process or
just simply call the wrapped method. At this point block does not have a
default and it probably won't.
"""
def wrapper_two_phase(pdm, *args, **kwargs):
try:
block = kwargs.pop('block')
except KeyError:
block = True # The default if not specified
if block:
return wrapped_method(pdm, *args, **kwargs)
else:
d = wrapped_method(pdm, *args, **kwargs)
deferred_id=pdm.save_pending_deferred(d)
return defer.succeed(deferred_id)
return wrapper_two_phase