#!/usr/bin/env python # encoding: utf-8 """Things directly related to all of twisted.""" #----------------------------------------------------------------------------- # Copyright (C) 2008-2009 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- import os, sys import threading, Queue, atexit import twisted from twisted.internet import defer, reactor from twisted.python import log, failure from IPython.kernel.error import FileTimeoutError #----------------------------------------------------------------------------- # Classes related to twisted and threads #----------------------------------------------------------------------------- class ReactorInThread(threading.Thread): """Run the twisted reactor in a different thread. For the process to be able to exit cleanly, do the following: rit = ReactorInThread() rit.setDaemon(True) rit.start() """ def run(self): """Run the twisted reactor in a thread. This runs the reactor with installSignalHandlers=0, which prevents twisted from installing any of its own signal handlers. This needs to be disabled because signal.signal can't be called in a thread. The only problem with this is that SIGCHLD events won't be detected so spawnProcess won't detect that its processes have been killed by an external factor. """ reactor.run(installSignalHandlers=0) # self.join() def stop(self): # I don't think this does anything useful. blockingCallFromThread(reactor.stop) self.join() if(twisted.version.major >= 8): import twisted.internet.threads def blockingCallFromThread(f, *a, **kw): """ Run a function in the reactor from a thread, and wait for the result synchronously, i.e. until the callback chain returned by the function get a result. Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw), passing twisted.internet.reactor for the first argument. @param f: the callable to run in the reactor thread @type f: any callable. @param a: the arguments to pass to C{f}. @param kw: the keyword arguments to pass to C{f}. @return: the result of the callback chain. @raise: any error raised during the callback chain. """ return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw) else: def blockingCallFromThread(f, *a, **kw): """ Run a function in the reactor from a thread, and wait for the result synchronously, i.e. until the callback chain returned by the function get a result. @param f: the callable to run in the reactor thread @type f: any callable. @param a: the arguments to pass to C{f}. @param kw: the keyword arguments to pass to C{f}. @return: the result of the callback chain. @raise: any error raised during the callback chain. """ from twisted.internet import reactor queue = Queue.Queue() def _callFromThread(): result = defer.maybeDeferred(f, *a, **kw) result.addBoth(queue.put) reactor.callFromThread(_callFromThread) result = queue.get() if isinstance(result, failure.Failure): # This makes it easier for the debugger to get access to the instance try: result.raiseException() except Exception, e: raise e return result #------------------------------------------------------------------------------- # Things for managing deferreds #------------------------------------------------------------------------------- def parseResults(results): """Pull out results/Failures from a DeferredList.""" return [x[1] for x in results] def gatherBoth(dlist, fireOnOneCallback=0, fireOnOneErrback=0, consumeErrors=0, logErrors=0): """This is like gatherBoth, but sets consumeErrors=1.""" d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback, consumeErrors, logErrors) if not fireOnOneCallback: d.addCallback(parseResults) return d SUCCESS = True FAILURE = False class DeferredList(defer.Deferred): """I combine a group of deferreds into one callback. I track a list of L{Deferred}s for their callbacks, and make a single callback when they have all completed, a list of (success, result) tuples, 'success' being a boolean. Note that you can still use a L{Deferred} after putting it in a DeferredList. For example, you can suppress 'Unhandled error in Deferred' messages by adding errbacks to the Deferreds *after* putting them in the DeferredList, as a DeferredList won't swallow the errors. (Although a more convenient way to do this is simply to set the consumeErrors flag) Note: This is a modified version of the twisted.internet.defer.DeferredList """ fireOnOneCallback = 0 fireOnOneErrback = 0 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, consumeErrors=0, logErrors=0): """Initialize a DeferredList. @type deferredList: C{list} of L{Deferred}s @param deferredList: The list of deferreds to track. @param fireOnOneCallback: (keyword param) a flag indicating that only one callback needs to be fired for me to call my callback @param fireOnOneErrback: (keyword param) a flag indicating that only one errback needs to be fired for me to call my errback @param consumeErrors: (keyword param) a flag indicating that any errors raised in the original deferreds should be consumed by this DeferredList. This is useful to prevent spurious warnings being logged. """ self.resultList = [None] * len(deferredList) defer.Deferred.__init__(self) if len(deferredList) == 0 and not fireOnOneCallback: self.callback(self.resultList) # These flags need to be set *before* attaching callbacks to the # deferreds, because the callbacks use these flags, and will run # synchronously if any of the deferreds are already fired. self.fireOnOneCallback = fireOnOneCallback self.fireOnOneErrback = fireOnOneErrback self.consumeErrors = consumeErrors self.logErrors = logErrors self.finishedCount = 0 index = 0 for deferred in deferredList: deferred.addCallbacks(self._cbDeferred, self._cbDeferred, callbackArgs=(index,SUCCESS), errbackArgs=(index,FAILURE)) index = index + 1 def _cbDeferred(self, result, index, succeeded): """(internal) Callback for when one of my deferreds fires. """ self.resultList[index] = (succeeded, result) self.finishedCount += 1 if not self.called: if succeeded == SUCCESS and self.fireOnOneCallback: self.callback((result, index)) elif succeeded == FAILURE and self.fireOnOneErrback: # We have modified this to fire the errback chain with the actual # Failure instance the originally occured rather than twisted's # FirstError which wraps the failure self.errback(result) elif self.finishedCount == len(self.resultList): self.callback(self.resultList) if succeeded == FAILURE and self.logErrors: log.err(result) if succeeded == FAILURE and self.consumeErrors: result = None return result def wait_for_file(filename, delay=0.1, max_tries=10): """Wait (poll) for a file to be created. This method returns a Deferred that will fire when a file exists. It works by polling os.path.isfile in time intervals specified by the delay argument. If `max_tries` is reached, it will errback with a `FileTimeoutError`. Parameters ---------- filename : str The name of the file to wait for. delay : float The time to wait between polls. max_tries : int The max number of attempts before raising `FileTimeoutError` Returns ------- d : Deferred A Deferred instance that will fire when the file exists. """ d = defer.Deferred() def _test_for_file(filename, attempt=0): if attempt >= max_tries: d.errback(FileTimeoutError( 'timeout waiting for file to be created: %s' % filename )) else: if os.path.isfile(filename): d.callback(True) else: reactor.callLater(delay, _test_for_file, filename, attempt+1) _test_for_file(filename) return d def sleep_deferred(seconds): """Sleep without blocking the event loop.""" d = defer.Deferred() reactor.callLater(seconds, d.callback, seconds) return d def make_deferred(func): """A decorator that calls a function with :func`maybeDeferred`.""" def _wrapper(*args, **kwargs): return defer.maybeDeferred(func, *args, **kwargs) return _wrapper