twistedutil.py
264 lines
| 9.3 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | #!/usr/bin/env python | |
# encoding: utf-8 | |||
"""Things directly related to all of twisted.""" | |||
__docformat__ = "restructuredtext en" | |||
#------------------------------------------------------------------------------- | |||
# Copyright (C) 2008 The IPython Development Team | |||
# | |||
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#------------------------------------------------------------------------------- | |||
#------------------------------------------------------------------------------- | |||
# Imports | |||
#------------------------------------------------------------------------------- | |||
Brian Granger
|
r1944 | import os, sys | |
Brian E Granger
|
r1234 | import threading, Queue, atexit | |
Brian Granger
|
r1944 | import twisted | |
Brian E Granger
|
r1234 | from twisted.internet import defer, reactor | |
from twisted.python import log, failure | |||
Brian Granger
|
r1944 | from IPython.kernel.error import FileTimeoutError | |
Brian E Granger
|
r1234 | #------------------------------------------------------------------------------- | |
# Classes related to twisted and threads | |||
#------------------------------------------------------------------------------- | |||
class ReactorInThread(threading.Thread): | |||
"""Run the twisted reactor in a different thread. | |||
For the process to be able to exit cleanly, do the following: | |||
rit = ReactorInThread() | |||
rit.setDaemon(True) | |||
rit.start() | |||
""" | |||
def run(self): | |||
reactor.run(installSignalHandlers=0) | |||
# self.join() | |||
def stop(self): | |||
# I don't think this does anything useful. | |||
blockingCallFromThread(reactor.stop) | |||
self.join() | |||
if(twisted.version.major >= 8): | |||
import twisted.internet.threads | |||
def blockingCallFromThread(f, *a, **kw): | |||
""" | |||
Run a function in the reactor from a thread, and wait for the result | |||
synchronously, i.e. until the callback chain returned by the function get a | |||
result. | |||
Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw), | |||
passing twisted.internet.reactor for the first argument. | |||
@param f: the callable to run in the reactor thread | |||
@type f: any callable. | |||
@param a: the arguments to pass to C{f}. | |||
@param kw: the keyword arguments to pass to C{f}. | |||
@return: the result of the callback chain. | |||
@raise: any error raised during the callback chain. | |||
""" | |||
return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw) | |||
else: | |||
def blockingCallFromThread(f, *a, **kw): | |||
""" | |||
Run a function in the reactor from a thread, and wait for the result | |||
synchronously, i.e. until the callback chain returned by the function get a | |||
result. | |||
@param f: the callable to run in the reactor thread | |||
@type f: any callable. | |||
@param a: the arguments to pass to C{f}. | |||
@param kw: the keyword arguments to pass to C{f}. | |||
@return: the result of the callback chain. | |||
@raise: any error raised during the callback chain. | |||
""" | |||
from twisted.internet import reactor | |||
queue = Queue.Queue() | |||
def _callFromThread(): | |||
result = defer.maybeDeferred(f, *a, **kw) | |||
result.addBoth(queue.put) | |||
reactor.callFromThread(_callFromThread) | |||
result = queue.get() | |||
if isinstance(result, failure.Failure): | |||
# This makes it easier for the debugger to get access to the instance | |||
try: | |||
result.raiseException() | |||
except Exception, e: | |||
raise e | |||
return result | |||
#------------------------------------------------------------------------------- | |||
# Things for managing deferreds | |||
#------------------------------------------------------------------------------- | |||
def parseResults(results): | |||
"""Pull out results/Failures from a DeferredList.""" | |||
return [x[1] for x in results] | |||
def gatherBoth(dlist, fireOnOneCallback=0, | |||
fireOnOneErrback=0, | |||
consumeErrors=0, | |||
logErrors=0): | |||
"""This is like gatherBoth, but sets consumeErrors=1.""" | |||
d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback, | |||
consumeErrors, logErrors) | |||
if not fireOnOneCallback: | |||
d.addCallback(parseResults) | |||
return d | |||
SUCCESS = True | |||
FAILURE = False | |||
class DeferredList(defer.Deferred): | |||
"""I combine a group of deferreds into one callback. | |||
I track a list of L{Deferred}s for their callbacks, and make a single | |||
callback when they have all completed, a list of (success, result) | |||
tuples, 'success' being a boolean. | |||
Note that you can still use a L{Deferred} after putting it in a | |||
DeferredList. For example, you can suppress 'Unhandled error in Deferred' | |||
messages by adding errbacks to the Deferreds *after* putting them in the | |||
DeferredList, as a DeferredList won't swallow the errors. (Although a more | |||
convenient way to do this is simply to set the consumeErrors flag) | |||
Note: This is a modified version of the twisted.internet.defer.DeferredList | |||
""" | |||
fireOnOneCallback = 0 | |||
fireOnOneErrback = 0 | |||
def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, | |||
consumeErrors=0, logErrors=0): | |||
"""Initialize a DeferredList. | |||
@type deferredList: C{list} of L{Deferred}s | |||
@param deferredList: The list of deferreds to track. | |||
@param fireOnOneCallback: (keyword param) a flag indicating that | |||
only one callback needs to be fired for me to call | |||
my callback | |||
@param fireOnOneErrback: (keyword param) a flag indicating that | |||
only one errback needs to be fired for me to call | |||
my errback | |||
@param consumeErrors: (keyword param) a flag indicating that any errors | |||
raised in the original deferreds should be | |||
consumed by this DeferredList. This is useful to | |||
prevent spurious warnings being logged. | |||
""" | |||
self.resultList = [None] * len(deferredList) | |||
defer.Deferred.__init__(self) | |||
if len(deferredList) == 0 and not fireOnOneCallback: | |||
self.callback(self.resultList) | |||
# These flags need to be set *before* attaching callbacks to the | |||
# deferreds, because the callbacks use these flags, and will run | |||
# synchronously if any of the deferreds are already fired. | |||
self.fireOnOneCallback = fireOnOneCallback | |||
self.fireOnOneErrback = fireOnOneErrback | |||
self.consumeErrors = consumeErrors | |||
self.logErrors = logErrors | |||
self.finishedCount = 0 | |||
index = 0 | |||
for deferred in deferredList: | |||
deferred.addCallbacks(self._cbDeferred, self._cbDeferred, | |||
callbackArgs=(index,SUCCESS), | |||
errbackArgs=(index,FAILURE)) | |||
index = index + 1 | |||
def _cbDeferred(self, result, index, succeeded): | |||
"""(internal) Callback for when one of my deferreds fires. | |||
""" | |||
self.resultList[index] = (succeeded, result) | |||
self.finishedCount += 1 | |||
if not self.called: | |||
if succeeded == SUCCESS and self.fireOnOneCallback: | |||
self.callback((result, index)) | |||
elif succeeded == FAILURE and self.fireOnOneErrback: | |||
# We have modified this to fire the errback chain with the actual | |||
# Failure instance the originally occured rather than twisted's | |||
# FirstError which wraps the failure | |||
self.errback(result) | |||
elif self.finishedCount == len(self.resultList): | |||
self.callback(self.resultList) | |||
if succeeded == FAILURE and self.logErrors: | |||
log.err(result) | |||
if succeeded == FAILURE and self.consumeErrors: | |||
result = None | |||
return result | |||
Brian Granger
|
r1944 | ||
def wait_for_file(filename, delay=0.1, max_tries=10): | |||
"""Wait (poll) for a file to be created. | |||
This method returns a Deferred that will fire when a file exists. It | |||
works by polling os.path.isfile in time intervals specified by the | |||
delay argument. If `max_tries` is reached, it will errback with a | |||
`FileTimeoutError`. | |||
Parameters | |||
---------- | |||
filename : str | |||
The name of the file to wait for. | |||
delay : float | |||
The time to wait between polls. | |||
max_tries : int | |||
The max number of attempts before raising `FileTimeoutError` | |||
Returns | |||
------- | |||
d : Deferred | |||
A Deferred instance that will fire when the file exists. | |||
""" | |||
d = defer.Deferred() | |||
def _test_for_file(filename, attempt=0): | |||
if attempt >= max_tries: | |||
d.errback(FileTimeoutError( | |||
'timeout waiting for file to be created: %s' % filename | |||
)) | |||
else: | |||
if os.path.isfile(filename): | |||
d.callback(True) | |||
else: | |||
reactor.callLater(delay, _test_for_file, filename, attempt+1) | |||
_test_for_file(filename) | |||
return d | |||
Brian Granger
|
r2302 | ||
def sleep_deferred(seconds): | |||
"""Sleep without blocking the event loop.""" | |||
d = defer.Deferred() | |||
reactor.callLater(seconds, d.callback, seconds) | |||
return d | |||
Brian Granger
|
r2303 | ||
def make_deferred(func): | |||
"""A decorator that calls a function with :func`maybeDeferred`.""" | |||
def _wrapper(*args, **kwargs): | |||
return defer.maybeDeferred(func, *args, **kwargs) | |||
return _wrapper |