twistedutil.py
206 lines
| 7.7 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
"""Things directly related to all of twisted.""" | ||||
__docformat__ = "restructuredtext en" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import threading, Queue, atexit | ||||
import twisted | ||||
from twisted.internet import defer, reactor | ||||
from twisted.python import log, failure | ||||
#------------------------------------------------------------------------------- | ||||
# Classes related to twisted and threads | ||||
#------------------------------------------------------------------------------- | ||||
class ReactorInThread(threading.Thread): | ||||
"""Run the twisted reactor in a different thread. | ||||
For the process to be able to exit cleanly, do the following: | ||||
rit = ReactorInThread() | ||||
rit.setDaemon(True) | ||||
rit.start() | ||||
""" | ||||
def run(self): | ||||
reactor.run(installSignalHandlers=0) | ||||
# self.join() | ||||
def stop(self): | ||||
# I don't think this does anything useful. | ||||
blockingCallFromThread(reactor.stop) | ||||
self.join() | ||||
if(twisted.version.major >= 8): | ||||
import twisted.internet.threads | ||||
def blockingCallFromThread(f, *a, **kw): | ||||
""" | ||||
Run a function in the reactor from a thread, and wait for the result | ||||
synchronously, i.e. until the callback chain returned by the function get a | ||||
result. | ||||
Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw), | ||||
passing twisted.internet.reactor for the first argument. | ||||
@param f: the callable to run in the reactor thread | ||||
@type f: any callable. | ||||
@param a: the arguments to pass to C{f}. | ||||
@param kw: the keyword arguments to pass to C{f}. | ||||
@return: the result of the callback chain. | ||||
@raise: any error raised during the callback chain. | ||||
""" | ||||
return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw) | ||||
else: | ||||
def blockingCallFromThread(f, *a, **kw): | ||||
""" | ||||
Run a function in the reactor from a thread, and wait for the result | ||||
synchronously, i.e. until the callback chain returned by the function get a | ||||
result. | ||||
@param f: the callable to run in the reactor thread | ||||
@type f: any callable. | ||||
@param a: the arguments to pass to C{f}. | ||||
@param kw: the keyword arguments to pass to C{f}. | ||||
@return: the result of the callback chain. | ||||
@raise: any error raised during the callback chain. | ||||
""" | ||||
from twisted.internet import reactor | ||||
queue = Queue.Queue() | ||||
def _callFromThread(): | ||||
result = defer.maybeDeferred(f, *a, **kw) | ||||
result.addBoth(queue.put) | ||||
reactor.callFromThread(_callFromThread) | ||||
result = queue.get() | ||||
if isinstance(result, failure.Failure): | ||||
# This makes it easier for the debugger to get access to the instance | ||||
try: | ||||
result.raiseException() | ||||
except Exception, e: | ||||
raise e | ||||
return result | ||||
#------------------------------------------------------------------------------- | ||||
# Things for managing deferreds | ||||
#------------------------------------------------------------------------------- | ||||
def parseResults(results): | ||||
"""Pull out results/Failures from a DeferredList.""" | ||||
return [x[1] for x in results] | ||||
def gatherBoth(dlist, fireOnOneCallback=0, | ||||
fireOnOneErrback=0, | ||||
consumeErrors=0, | ||||
logErrors=0): | ||||
"""This is like gatherBoth, but sets consumeErrors=1.""" | ||||
d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback, | ||||
consumeErrors, logErrors) | ||||
if not fireOnOneCallback: | ||||
d.addCallback(parseResults) | ||||
return d | ||||
SUCCESS = True | ||||
FAILURE = False | ||||
class DeferredList(defer.Deferred): | ||||
"""I combine a group of deferreds into one callback. | ||||
I track a list of L{Deferred}s for their callbacks, and make a single | ||||
callback when they have all completed, a list of (success, result) | ||||
tuples, 'success' being a boolean. | ||||
Note that you can still use a L{Deferred} after putting it in a | ||||
DeferredList. For example, you can suppress 'Unhandled error in Deferred' | ||||
messages by adding errbacks to the Deferreds *after* putting them in the | ||||
DeferredList, as a DeferredList won't swallow the errors. (Although a more | ||||
convenient way to do this is simply to set the consumeErrors flag) | ||||
Note: This is a modified version of the twisted.internet.defer.DeferredList | ||||
""" | ||||
fireOnOneCallback = 0 | ||||
fireOnOneErrback = 0 | ||||
def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, | ||||
consumeErrors=0, logErrors=0): | ||||
"""Initialize a DeferredList. | ||||
@type deferredList: C{list} of L{Deferred}s | ||||
@param deferredList: The list of deferreds to track. | ||||
@param fireOnOneCallback: (keyword param) a flag indicating that | ||||
only one callback needs to be fired for me to call | ||||
my callback | ||||
@param fireOnOneErrback: (keyword param) a flag indicating that | ||||
only one errback needs to be fired for me to call | ||||
my errback | ||||
@param consumeErrors: (keyword param) a flag indicating that any errors | ||||
raised in the original deferreds should be | ||||
consumed by this DeferredList. This is useful to | ||||
prevent spurious warnings being logged. | ||||
""" | ||||
self.resultList = [None] * len(deferredList) | ||||
defer.Deferred.__init__(self) | ||||
if len(deferredList) == 0 and not fireOnOneCallback: | ||||
self.callback(self.resultList) | ||||
# These flags need to be set *before* attaching callbacks to the | ||||
# deferreds, because the callbacks use these flags, and will run | ||||
# synchronously if any of the deferreds are already fired. | ||||
self.fireOnOneCallback = fireOnOneCallback | ||||
self.fireOnOneErrback = fireOnOneErrback | ||||
self.consumeErrors = consumeErrors | ||||
self.logErrors = logErrors | ||||
self.finishedCount = 0 | ||||
index = 0 | ||||
for deferred in deferredList: | ||||
deferred.addCallbacks(self._cbDeferred, self._cbDeferred, | ||||
callbackArgs=(index,SUCCESS), | ||||
errbackArgs=(index,FAILURE)) | ||||
index = index + 1 | ||||
def _cbDeferred(self, result, index, succeeded): | ||||
"""(internal) Callback for when one of my deferreds fires. | ||||
""" | ||||
self.resultList[index] = (succeeded, result) | ||||
self.finishedCount += 1 | ||||
if not self.called: | ||||
if succeeded == SUCCESS and self.fireOnOneCallback: | ||||
self.callback((result, index)) | ||||
elif succeeded == FAILURE and self.fireOnOneErrback: | ||||
# We have modified this to fire the errback chain with the actual | ||||
# Failure instance the originally occured rather than twisted's | ||||
# FirstError which wraps the failure | ||||
self.errback(result) | ||||
elif self.finishedCount == len(self.resultList): | ||||
self.callback(self.resultList) | ||||
if succeeded == FAILURE and self.logErrors: | ||||
log.err(result) | ||||
if succeeded == FAILURE and self.consumeErrors: | ||||
result = None | ||||
return result | ||||