#!/usr/bin/env python # encoding: utf-8 """Things directly related to all of twisted.""" __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- import threading, Queue, atexit import twisted from twisted.internet import defer, reactor from twisted.python import log, failure #------------------------------------------------------------------------------- # Classes related to twisted and threads #------------------------------------------------------------------------------- class ReactorInThread(threading.Thread): """Run the twisted reactor in a different thread. For the process to be able to exit cleanly, do the following: rit = ReactorInThread() rit.setDaemon(True) rit.start() """ def run(self): reactor.run(installSignalHandlers=0) # self.join() def stop(self): # I don't think this does anything useful. blockingCallFromThread(reactor.stop) self.join() if(twisted.version.major >= 8): import twisted.internet.threads def blockingCallFromThread(f, *a, **kw): """ Run a function in the reactor from a thread, and wait for the result synchronously, i.e. until the callback chain returned by the function get a result. Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw), passing twisted.internet.reactor for the first argument. @param f: the callable to run in the reactor thread @type f: any callable. @param a: the arguments to pass to C{f}. @param kw: the keyword arguments to pass to C{f}. @return: the result of the callback chain. @raise: any error raised during the callback chain. """ return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw) else: def blockingCallFromThread(f, *a, **kw): """ Run a function in the reactor from a thread, and wait for the result synchronously, i.e. until the callback chain returned by the function get a result. @param f: the callable to run in the reactor thread @type f: any callable. @param a: the arguments to pass to C{f}. @param kw: the keyword arguments to pass to C{f}. @return: the result of the callback chain. @raise: any error raised during the callback chain. """ from twisted.internet import reactor queue = Queue.Queue() def _callFromThread(): result = defer.maybeDeferred(f, *a, **kw) result.addBoth(queue.put) reactor.callFromThread(_callFromThread) result = queue.get() if isinstance(result, failure.Failure): # This makes it easier for the debugger to get access to the instance try: result.raiseException() except Exception, e: raise e return result #------------------------------------------------------------------------------- # Things for managing deferreds #------------------------------------------------------------------------------- def parseResults(results): """Pull out results/Failures from a DeferredList.""" return [x[1] for x in results] def gatherBoth(dlist, fireOnOneCallback=0, fireOnOneErrback=0, consumeErrors=0, logErrors=0): """This is like gatherBoth, but sets consumeErrors=1.""" d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback, consumeErrors, logErrors) if not fireOnOneCallback: d.addCallback(parseResults) return d SUCCESS = True FAILURE = False class DeferredList(defer.Deferred): """I combine a group of deferreds into one callback. I track a list of L{Deferred}s for their callbacks, and make a single callback when they have all completed, a list of (success, result) tuples, 'success' being a boolean. Note that you can still use a L{Deferred} after putting it in a DeferredList. For example, you can suppress 'Unhandled error in Deferred' messages by adding errbacks to the Deferreds *after* putting them in the DeferredList, as a DeferredList won't swallow the errors. (Although a more convenient way to do this is simply to set the consumeErrors flag) Note: This is a modified version of the twisted.internet.defer.DeferredList """ fireOnOneCallback = 0 fireOnOneErrback = 0 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, consumeErrors=0, logErrors=0): """Initialize a DeferredList. @type deferredList: C{list} of L{Deferred}s @param deferredList: The list of deferreds to track. @param fireOnOneCallback: (keyword param) a flag indicating that only one callback needs to be fired for me to call my callback @param fireOnOneErrback: (keyword param) a flag indicating that only one errback needs to be fired for me to call my errback @param consumeErrors: (keyword param) a flag indicating that any errors raised in the original deferreds should be consumed by this DeferredList. This is useful to prevent spurious warnings being logged. """ self.resultList = [None] * len(deferredList) defer.Deferred.__init__(self) if len(deferredList) == 0 and not fireOnOneCallback: self.callback(self.resultList) # These flags need to be set *before* attaching callbacks to the # deferreds, because the callbacks use these flags, and will run # synchronously if any of the deferreds are already fired. self.fireOnOneCallback = fireOnOneCallback self.fireOnOneErrback = fireOnOneErrback self.consumeErrors = consumeErrors self.logErrors = logErrors self.finishedCount = 0 index = 0 for deferred in deferredList: deferred.addCallbacks(self._cbDeferred, self._cbDeferred, callbackArgs=(index,SUCCESS), errbackArgs=(index,FAILURE)) index = index + 1 def _cbDeferred(self, result, index, succeeded): """(internal) Callback for when one of my deferreds fires. """ self.resultList[index] = (succeeded, result) self.finishedCount += 1 if not self.called: if succeeded == SUCCESS and self.fireOnOneCallback: self.callback((result, index)) elif succeeded == FAILURE and self.fireOnOneErrback: # We have modified this to fire the errback chain with the actual # Failure instance the originally occured rather than twisted's # FirstError which wraps the failure self.errback(result) elif self.finishedCount == len(self.resultList): self.callback(self.resultList) if succeeded == FAILURE and self.logErrors: log.err(result) if succeeded == FAILURE and self.consumeErrors: result = None return result