twistedutil.py
274 lines
| 9.7 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
"""Things directly related to all of twisted.""" | ||||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2008-2009 The IPython Development Team | ||||
Brian E Granger
|
r1234 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | # Imports | ||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1944 | import os, sys | ||
Brian Granger
|
r2498 | import threading, Queue | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1944 | import twisted | ||
Brian E Granger
|
r1234 | from twisted.internet import defer, reactor | ||
from twisted.python import log, failure | ||||
Brian Granger
|
r1944 | from IPython.kernel.error import FileTimeoutError | ||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | # Classes related to twisted and threads | ||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
class ReactorInThread(threading.Thread): | ||||
"""Run the twisted reactor in a different thread. | ||||
For the process to be able to exit cleanly, do the following: | ||||
rit = ReactorInThread() | ||||
rit.setDaemon(True) | ||||
rit.start() | ||||
""" | ||||
def run(self): | ||||
Brian Granger
|
r2309 | """Run the twisted reactor in a thread. | ||
This runs the reactor with installSignalHandlers=0, which prevents | ||||
twisted from installing any of its own signal handlers. This needs to | ||||
be disabled because signal.signal can't be called in a thread. The | ||||
only problem with this is that SIGCHLD events won't be detected so | ||||
spawnProcess won't detect that its processes have been killed by | ||||
an external factor. | ||||
""" | ||||
Brian E Granger
|
r1234 | reactor.run(installSignalHandlers=0) | ||
# self.join() | ||||
def stop(self): | ||||
# I don't think this does anything useful. | ||||
blockingCallFromThread(reactor.stop) | ||||
self.join() | ||||
if(twisted.version.major >= 8): | ||||
import twisted.internet.threads | ||||
def blockingCallFromThread(f, *a, **kw): | ||||
""" | ||||
Run a function in the reactor from a thread, and wait for the result | ||||
synchronously, i.e. until the callback chain returned by the function get a | ||||
result. | ||||
Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw), | ||||
passing twisted.internet.reactor for the first argument. | ||||
@param f: the callable to run in the reactor thread | ||||
@type f: any callable. | ||||
@param a: the arguments to pass to C{f}. | ||||
@param kw: the keyword arguments to pass to C{f}. | ||||
@return: the result of the callback chain. | ||||
@raise: any error raised during the callback chain. | ||||
""" | ||||
return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw) | ||||
else: | ||||
def blockingCallFromThread(f, *a, **kw): | ||||
""" | ||||
Run a function in the reactor from a thread, and wait for the result | ||||
synchronously, i.e. until the callback chain returned by the function get a | ||||
result. | ||||
@param f: the callable to run in the reactor thread | ||||
@type f: any callable. | ||||
@param a: the arguments to pass to C{f}. | ||||
@param kw: the keyword arguments to pass to C{f}. | ||||
@return: the result of the callback chain. | ||||
@raise: any error raised during the callback chain. | ||||
""" | ||||
from twisted.internet import reactor | ||||
queue = Queue.Queue() | ||||
def _callFromThread(): | ||||
result = defer.maybeDeferred(f, *a, **kw) | ||||
result.addBoth(queue.put) | ||||
reactor.callFromThread(_callFromThread) | ||||
result = queue.get() | ||||
if isinstance(result, failure.Failure): | ||||
# This makes it easier for the debugger to get access to the instance | ||||
try: | ||||
result.raiseException() | ||||
except Exception, e: | ||||
raise e | ||||
return result | ||||
#------------------------------------------------------------------------------- | ||||
# Things for managing deferreds | ||||
#------------------------------------------------------------------------------- | ||||
def parseResults(results): | ||||
"""Pull out results/Failures from a DeferredList.""" | ||||
return [x[1] for x in results] | ||||
def gatherBoth(dlist, fireOnOneCallback=0, | ||||
fireOnOneErrback=0, | ||||
consumeErrors=0, | ||||
logErrors=0): | ||||
"""This is like gatherBoth, but sets consumeErrors=1.""" | ||||
d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback, | ||||
consumeErrors, logErrors) | ||||
if not fireOnOneCallback: | ||||
d.addCallback(parseResults) | ||||
return d | ||||
SUCCESS = True | ||||
FAILURE = False | ||||
class DeferredList(defer.Deferred): | ||||
"""I combine a group of deferreds into one callback. | ||||
I track a list of L{Deferred}s for their callbacks, and make a single | ||||
callback when they have all completed, a list of (success, result) | ||||
tuples, 'success' being a boolean. | ||||
Note that you can still use a L{Deferred} after putting it in a | ||||
DeferredList. For example, you can suppress 'Unhandled error in Deferred' | ||||
messages by adding errbacks to the Deferreds *after* putting them in the | ||||
DeferredList, as a DeferredList won't swallow the errors. (Although a more | ||||
convenient way to do this is simply to set the consumeErrors flag) | ||||
Note: This is a modified version of the twisted.internet.defer.DeferredList | ||||
""" | ||||
fireOnOneCallback = 0 | ||||
fireOnOneErrback = 0 | ||||
def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, | ||||
consumeErrors=0, logErrors=0): | ||||
"""Initialize a DeferredList. | ||||
@type deferredList: C{list} of L{Deferred}s | ||||
@param deferredList: The list of deferreds to track. | ||||
@param fireOnOneCallback: (keyword param) a flag indicating that | ||||
only one callback needs to be fired for me to call | ||||
my callback | ||||
@param fireOnOneErrback: (keyword param) a flag indicating that | ||||
only one errback needs to be fired for me to call | ||||
my errback | ||||
@param consumeErrors: (keyword param) a flag indicating that any errors | ||||
raised in the original deferreds should be | ||||
consumed by this DeferredList. This is useful to | ||||
prevent spurious warnings being logged. | ||||
""" | ||||
self.resultList = [None] * len(deferredList) | ||||
defer.Deferred.__init__(self) | ||||
if len(deferredList) == 0 and not fireOnOneCallback: | ||||
self.callback(self.resultList) | ||||
# These flags need to be set *before* attaching callbacks to the | ||||
# deferreds, because the callbacks use these flags, and will run | ||||
# synchronously if any of the deferreds are already fired. | ||||
self.fireOnOneCallback = fireOnOneCallback | ||||
self.fireOnOneErrback = fireOnOneErrback | ||||
self.consumeErrors = consumeErrors | ||||
self.logErrors = logErrors | ||||
self.finishedCount = 0 | ||||
index = 0 | ||||
for deferred in deferredList: | ||||
deferred.addCallbacks(self._cbDeferred, self._cbDeferred, | ||||
callbackArgs=(index,SUCCESS), | ||||
errbackArgs=(index,FAILURE)) | ||||
index = index + 1 | ||||
def _cbDeferred(self, result, index, succeeded): | ||||
"""(internal) Callback for when one of my deferreds fires. | ||||
""" | ||||
self.resultList[index] = (succeeded, result) | ||||
self.finishedCount += 1 | ||||
if not self.called: | ||||
if succeeded == SUCCESS and self.fireOnOneCallback: | ||||
self.callback((result, index)) | ||||
elif succeeded == FAILURE and self.fireOnOneErrback: | ||||
# We have modified this to fire the errback chain with the actual | ||||
# Failure instance the originally occured rather than twisted's | ||||
# FirstError which wraps the failure | ||||
self.errback(result) | ||||
elif self.finishedCount == len(self.resultList): | ||||
self.callback(self.resultList) | ||||
if succeeded == FAILURE and self.logErrors: | ||||
log.err(result) | ||||
if succeeded == FAILURE and self.consumeErrors: | ||||
result = None | ||||
return result | ||||
Brian Granger
|
r1944 | |||
def wait_for_file(filename, delay=0.1, max_tries=10): | ||||
"""Wait (poll) for a file to be created. | ||||
This method returns a Deferred that will fire when a file exists. It | ||||
works by polling os.path.isfile in time intervals specified by the | ||||
delay argument. If `max_tries` is reached, it will errback with a | ||||
`FileTimeoutError`. | ||||
Parameters | ||||
---------- | ||||
filename : str | ||||
The name of the file to wait for. | ||||
delay : float | ||||
The time to wait between polls. | ||||
max_tries : int | ||||
The max number of attempts before raising `FileTimeoutError` | ||||
Returns | ||||
------- | ||||
d : Deferred | ||||
A Deferred instance that will fire when the file exists. | ||||
""" | ||||
d = defer.Deferred() | ||||
def _test_for_file(filename, attempt=0): | ||||
if attempt >= max_tries: | ||||
d.errback(FileTimeoutError( | ||||
'timeout waiting for file to be created: %s' % filename | ||||
)) | ||||
else: | ||||
if os.path.isfile(filename): | ||||
d.callback(True) | ||||
else: | ||||
reactor.callLater(delay, _test_for_file, filename, attempt+1) | ||||
_test_for_file(filename) | ||||
return d | ||||
Brian Granger
|
r2302 | |||
def sleep_deferred(seconds): | ||||
"""Sleep without blocking the event loop.""" | ||||
d = defer.Deferred() | ||||
reactor.callLater(seconds, d.callback, seconds) | ||||
return d | ||||
Brian Granger
|
r2303 | |||
def make_deferred(func): | ||||
"""A decorator that calls a function with :func`maybeDeferred`.""" | ||||
def _wrapper(*args, **kwargs): | ||||
return defer.maybeDeferred(func, *args, **kwargs) | ||||
Brian Granger
|
r2309 | return _wrapper | ||