##// END OF EJS Templates
The refactoring of the Task system is nearly complete. Now there are...
The refactoring of the Task system is nearly complete. Now there are multiple types of tasks including `StringTask` and `MapTask`. Each task type is responsible for running itself and processing its own result. This makes it much easier for people to create new task types. Also, the map and parallel function support has been completely refactored and improved. This includes a map and parallel function implementation for the task controller as well as a @parallel decorator.

File last commit:

r1171:ea2c6247
r1395:1feaf0a3
Show More
twshell.py
278 lines | 9.5 KiB | text/x-python | PythonLexer
import sys
from twisted.internet import reactor, threads
from IPython.ipmaker import make_IPython
from IPython.iplib import InteractiveShell
from IPython.ipstruct import Struct
import Queue,thread,threading,signal
from signal import signal, SIGINT
from IPython.genutils import Term,warn,error,flag_calls, ask_yes_no
import shellglobals
def install_gtk2():
""" Install gtk2 reactor, needs to be called bef """
from twisted.internet import gtk2reactor
gtk2reactor.install()
def hijack_reactor():
"""Modifies Twisted's reactor with a dummy so user code does
not block IPython. This function returns the original
'twisted.internet.reactor' that has been hijacked.
NOTE: Make sure you call this *AFTER* you've installed
the reactor of your choice.
"""
from twisted import internet
orig_reactor = internet.reactor
class DummyReactor(object):
def run(self):
pass
def __getattr__(self, name):
return getattr(orig_reactor, name)
def __setattr__(self, name, value):
return setattr(orig_reactor, name, value)
internet.reactor = DummyReactor()
return orig_reactor
class TwistedInteractiveShell(InteractiveShell):
"""Simple multi-threaded shell."""
# Threading strategy taken from:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/65109, by Brian
# McErlean and John Finlay. Modified with corrections by Antoon Pardon,
# from the pygtk mailing list, to avoid lockups with system calls.
# class attribute to indicate whether the class supports threads or not.
# Subclasses with thread support should override this as needed.
isthreaded = True
def __init__(self,name,usage=None,rc=Struct(opts=None,args=None),
user_ns=None,user_global_ns=None,banner2='',**kw):
"""Similar to the normal InteractiveShell, but with threading control"""
InteractiveShell.__init__(self,name,usage,rc,user_ns,
user_global_ns,banner2)
# A queue to hold the code to be executed.
self.code_queue = Queue.Queue()
# Stuff to do at closing time
self._kill = None
on_kill = kw.get('on_kill', [])
# Check that all things to kill are callable:
for t in on_kill:
if not callable(t):
raise TypeError,'on_kill must be a list of callables'
self.on_kill = on_kill
# thread identity of the "worker thread" (that may execute code directly)
self.worker_ident = None
self.reactor_started = False
self.first_run = True
def runsource(self, source, filename="<input>", symbol="single"):
"""Compile and run some source in the interpreter.
Modified version of code.py's runsource(), to handle threading issues.
See the original for full docstring details."""
# If Ctrl-C was typed, we reset the flag and return right away
if shellglobals.KBINT:
shellglobals.KBINT = False
return False
if self._kill:
# can't queue new code if we are being killed
return True
try:
code = self.compile(source, filename, symbol)
except (OverflowError, SyntaxError, ValueError):
# Case 1
self.showsyntaxerror(filename)
return False
if code is None:
# Case 2
return True
# shortcut - if we are in worker thread, or the worker thread is not running,
# execute directly (to allow recursion and prevent deadlock if code is run early
# in IPython construction)
if (not self.reactor_started or (self.worker_ident is None and not self.first_run)
or self.worker_ident == thread.get_ident() or shellglobals.run_in_frontend(source)):
InteractiveShell.runcode(self,code)
return
# Case 3
# Store code in queue, so the execution thread can handle it.
self.first_run = False
completed_ev, received_ev = threading.Event(), threading.Event()
self.code_queue.put((code,completed_ev, received_ev))
reactor.callLater(0.0,self.runcode)
received_ev.wait(5)
if not received_ev.isSet():
# the mainloop is dead, start executing code directly
print "Warning: Timeout for mainloop thread exceeded"
print "switching to nonthreaded mode (until mainloop wakes up again)"
self.worker_ident = None
else:
completed_ev.wait()
return False
def runcode(self):
"""Execute a code object.
Multithreaded wrapper around IPython's runcode()."""
# we are in worker thread, stash out the id for runsource()
self.worker_ident = thread.get_ident()
if self._kill:
print >>Term.cout, 'Closing threads...',
Term.cout.flush()
for tokill in self.on_kill:
tokill()
print >>Term.cout, 'Done.'
# allow kill() to return
self._kill.set()
return True
# Install SIGINT handler. We do it every time to ensure that if user
# code modifies it, we restore our own handling.
try:
pass
signal(SIGINT,shellglobals.sigint_handler)
except SystemError:
# This happens under Windows, which seems to have all sorts
# of problems with signal handling. Oh well...
pass
# Flush queue of pending code by calling the run methood of the parent
# class with all items which may be in the queue.
code_to_run = None
while 1:
try:
code_to_run, completed_ev, received_ev = self.code_queue.get_nowait()
except Queue.Empty:
break
received_ev.set()
# Exceptions need to be raised differently depending on which
# thread is active. This convoluted try/except is only there to
# protect against asynchronous exceptions, to ensure that a shellglobals.KBINT
# at the wrong time doesn't deadlock everything. The global
# CODE_TO_RUN is set to true/false as close as possible to the
# runcode() call, so that the KBINT handler is correctly informed.
try:
try:
shellglobals.CODE_RUN = True
InteractiveShell.runcode(self,code_to_run)
except KeyboardInterrupt:
print "Keyboard interrupted in mainloop"
while not self.code_queue.empty():
code = self.code_queue.get_nowait()
break
finally:
shellglobals.CODE_RUN = False
# allow runsource() return from wait
completed_ev.set()
# This MUST return true for gtk threading to work
return True
def kill(self):
"""Kill the thread, returning when it has been shut down."""
self._kill = threading.Event()
reactor.callLater(0.0,self.runcode)
self._kill.wait()
class IPShellTwisted:
"""Run a Twisted reactor while in an IPython session.
Python commands can be passed to the thread where they will be
executed. This is implemented by periodically checking for
passed code using a Twisted reactor callback.
"""
TIMEOUT = 0.01 # Millisecond interval between reactor runs.
def __init__(self, argv=None, user_ns=None, debug=1,
shell_class=TwistedInteractiveShell):
from twisted.internet import reactor
self.reactor = hijack_reactor()
mainquit = self.reactor.stop
# Make sure IPython keeps going after reactor stop.
def reactorstop():
pass
self.reactor.stop = reactorstop
reactorrun_orig = self.reactor.run
self.quitting = False
def reactorrun():
while True and not self.quitting:
reactorrun_orig()
self.reactor.run = reactorrun
self.IP = make_IPython(argv, user_ns=user_ns, debug=debug,
shell_class=shell_class,
on_kill=[mainquit])
# threading.Thread.__init__(self)
def run(self):
self.IP.mainloop()
self.quitting = True
self.IP.kill()
def mainloop(self):
def mainLoopThreadDeath(r):
print "mainLoopThreadDeath: ", str(r)
def spawnMainloopThread():
d=threads.deferToThread(self.run)
d.addBoth(mainLoopThreadDeath)
reactor.callWhenRunning(spawnMainloopThread)
self.IP.reactor_started = True
self.reactor.run()
print "mainloop ending...."
exists = True
if __name__ == '__main__':
# Sample usage.
# Create the shell object. This steals twisted.internet.reactor
# for its own purposes, to make sure you've already installed a
# reactor of your choice.
shell = IPShellTwisted(
argv=[],
user_ns={'__name__': '__example__',
'hello': 'world',
},
)
# Run the mainloop. This runs the actual reactor.run() method.
# The twisted.internet.reactor object at this point is a dummy
# object that passes through to the actual reactor, but prevents
# run() from being called on it again.
shell.mainloop()
# You must exit IPython to terminate your program.
print 'Goodbye!'