##// END OF EJS Templates
Fix another win32 test failure....
Fix another win32 test failure. As of this commit, the entire test suite passes on a system that only has: - Python 2.6.4 from Python.org - Pyreadline 1.5 - IPython (this branch) - nose 0.11.1 Tests that require Twisted, graphics or other machinery get skipped, but everything else runs and passes. The glaring remaining problem on win32 is that right now, colored output is broken for some reason (though prompts are OK). I'll try to fix that next.

File last commit:

r1112:bc7ad023 merge
r2456:3968d6e6
Show More
thread_ex.py
50 lines | 1.7 KiB | text/x-python | PythonLexer
"""
Thread subclass that can deal with asynchronously function calls via
raise_exc.
"""
import threading
import inspect
import ctypes
def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
raise SystemError("PyThreadState_SetAsyncExc failed")
class ThreadEx(threading.Thread):
def _get_my_tid(self):
"""determines this (self's) thread id"""
if not self.isAlive():
raise threading.ThreadError("the thread is not active")
# do we have it cached?
if hasattr(self, "_thread_id"):
return self._thread_id
# no, look for it in the _active dict
for tid, tobj in threading._active.items():
if tobj is self:
self._thread_id = tid
return tid
raise AssertionError("could not determine the thread's id")
def raise_exc(self, exctype):
"""raises the given exception type in the context of this thread"""
_async_raise(self._get_my_tid(), exctype)
def kill(self):
"""raises SystemExit in the context of the given thread, which should
cause the thread to exit silently (unless caught)"""
self.raise_exc(SystemExit)