##// END OF EJS Templates
match log format in Scheduler to rest of parallel apps
match log format in Scheduler to rest of parallel apps

File last commit:

r4872:34c10438
r6356:cc6aa306
Show More
twshell.py
287 lines | 9.6 KiB | text/x-python | PythonLexer
"""Twisted shell support.
XXX - This module is missing proper docs.
"""
# Tell nose to skip this module
__test__ = {}
import sys
from twisted.internet import reactor, threads
from IPython.core.ipmaker import make_IPython
from IPython.core.iplib import InteractiveShell
from IPython.utils.ipstruct import Struct
import Queue,thread,threading,signal
from signal import signal, SIGINT
import IPython.utils.io, ask_yes_no
from IPython.utils.warn import warn, error
from IPython.utils.decorators import flag_calls
from IPython.core import shellglobals
def install_gtk2():
""" Install gtk2 reactor, needs to be called bef """
from twisted.internet import gtk2reactor
gtk2reactor.install()
def hijack_reactor():
"""Modifies Twisted's reactor with a dummy so user code does
not block IPython. This function returns the original
'twisted.internet.reactor' that has been hijacked.
NOTE: Make sure you call this *AFTER* you've installed
the reactor of your choice.
"""
from twisted import internet
orig_reactor = internet.reactor
class DummyReactor(object):
def run(self):
pass
def __getattr__(self, name):
return getattr(orig_reactor, name)
def __setattr__(self, name, value):
return setattr(orig_reactor, name, value)
internet.reactor = DummyReactor()
return orig_reactor
class TwistedInteractiveShell(InteractiveShell):
"""Simple multi-threaded shell."""
# Threading strategy taken from:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/65109, by Brian
# McErlean and John Finlay. Modified with corrections by Antoon Pardon,
# from the pygtk mailing list, to avoid lockups with system calls.
# class attribute to indicate whether the class supports threads or not.
# Subclasses with thread support should override this as needed.
isthreaded = True
def __init__(self,name,usage=None,rc=Struct(opts=None,args=None),
user_ns=None,user_global_ns=None,banner2='',**kw):
"""Similar to the normal InteractiveShell, but with threading control"""
InteractiveShell.__init__(self,name,usage,rc,user_ns,
user_global_ns,banner2)
# A queue to hold the code to be executed.
self.code_queue = Queue.Queue()
# Stuff to do at closing time
self._kill = None
on_kill = kw.get('on_kill', [])
# Check that all things to kill are callable:
for t in on_kill:
if not callable(t):
raise TypeError,'on_kill must be a list of callables'
self.on_kill = on_kill
# thread identity of the "worker thread" (that may execute code directly)
self.worker_ident = None
self.reactor_started = False
self.first_run = True
def runsource(self, source, filename="<input>", symbol="single"):
"""Compile and run some source in the interpreter.
Modified version of code.py's runsource(), to handle threading issues.
See the original for full docstring details."""
# If Ctrl-C was typed, we reset the flag and return right away
if shellglobals.KBINT:
shellglobals.KBINT = False
return False
if self._kill:
# can't queue new code if we are being killed
return True
try:
code = self.compile(source, filename, symbol)
except (OverflowError, SyntaxError, ValueError):
# Case 1
self.showsyntaxerror(filename)
return False
if code is None:
# Case 2
return True
# shortcut - if we are in worker thread, or the worker thread is not running,
# execute directly (to allow recursion and prevent deadlock if code is run early
# in IPython construction)
if (not self.reactor_started or (self.worker_ident is None and not self.first_run)
or self.worker_ident == thread.get_ident() or shellglobals.run_in_frontend(source)):
InteractiveShell.runcode(self,code)
return
# Case 3
# Store code in queue, so the execution thread can handle it.
self.first_run = False
completed_ev, received_ev = threading.Event(), threading.Event()
self.code_queue.put((code,completed_ev, received_ev))
reactor.callLater(0.0,self.runcode)
received_ev.wait(5)
if not received_ev.isSet():
# the mainloop is dead, start executing code directly
print "Warning: Timeout for mainloop thread exceeded"
print "switching to nonthreaded mode (until mainloop wakes up again)"
self.worker_ident = None
else:
completed_ev.wait()
return False
def runcode(self):
"""Execute a code object.
Multithreaded wrapper around IPython's runcode()."""
# we are in worker thread, stash out the id for runsource()
self.worker_ident = thread.get_ident()
if self._kill:
print >>Term.cout, 'Closing threads...',
Term.cout.flush()
for tokill in self.on_kill:
tokill()
print >>Term.cout, 'Done.'
# allow kill() to return
self._kill.set()
return True
# Install SIGINT handler. We do it every time to ensure that if user
# code modifies it, we restore our own handling.
try:
pass
signal(SIGINT,shellglobals.sigint_handler)
except SystemError:
# This happens under Windows, which seems to have all sorts
# of problems with signal handling. Oh well...
pass
# Flush queue of pending code by calling the run methood of the parent
# class with all items which may be in the queue.
code_to_run = None
while 1:
try:
code_to_run, completed_ev, received_ev = self.code_queue.get_nowait()
except Queue.Empty:
break
received_ev.set()
# Exceptions need to be raised differently depending on which
# thread is active. This convoluted try/except is only there to
# protect against asynchronous exceptions, to ensure that a shellglobals.KBINT
# at the wrong time doesn't deadlock everything. The global
# CODE_TO_RUN is set to true/false as close as possible to the
# runcode() call, so that the KBINT handler is correctly informed.
try:
try:
shellglobals.CODE_RUN = True
InteractiveShell.runcode(self,code_to_run)
except KeyboardInterrupt:
print "Keyboard interrupted in mainloop"
while not self.code_queue.empty():
code = self.code_queue.get_nowait()
break
finally:
shellglobals.CODE_RUN = False
# allow runsource() return from wait
completed_ev.set()
# This MUST return true for gtk threading to work
return True
def kill(self):
"""Kill the thread, returning when it has been shut down."""
self._kill = threading.Event()
reactor.callLater(0.0,self.runcode)
self._kill.wait()
class IPShellTwisted:
"""Run a Twisted reactor while in an IPython session.
Python commands can be passed to the thread where they will be
executed. This is implemented by periodically checking for
passed code using a Twisted reactor callback.
"""
TIMEOUT = 0.01 # Millisecond interval between reactor runs.
def __init__(self, argv=None, user_ns=None, debug=1,
shell_class=TwistedInteractiveShell):
from twisted.internet import reactor
self.reactor = hijack_reactor()
mainquit = self.reactor.stop
# Make sure IPython keeps going after reactor stop.
def reactorstop():
pass
self.reactor.stop = reactorstop
reactorrun_orig = self.reactor.run
self.quitting = False
def reactorrun():
while True and not self.quitting:
reactorrun_orig()
self.reactor.run = reactorrun
self.IP = make_IPython(argv, user_ns=user_ns, debug=debug,
shell_class=shell_class,
on_kill=[mainquit])
# threading.Thread.__init__(self)
def run(self):
self.IP.mainloop()
self.quitting = True
self.IP.kill()
def mainloop(self):
def mainLoopThreadDeath(r):
print "mainLoopThreadDeath: ", str(r)
def spawnMainloopThread():
d=threads.deferToThread(self.run)
d.addBoth(mainLoopThreadDeath)
reactor.callWhenRunning(spawnMainloopThread)
self.IP.reactor_started = True
self.reactor.run()
print "mainloop ending...."
exists = True
if __name__ == '__main__':
# Sample usage.
# Create the shell object. This steals twisted.internet.reactor
# for its own purposes, to make sure you've already installed a
# reactor of your choice.
shell = IPShellTwisted(
argv=[],
user_ns={'__name__': '__example__',
'hello': 'world',
},
)
# Run the mainloop. This runs the actual reactor.run() method.
# The twisted.internet.reactor object at this point is a dummy
# object that passes through to the actual reactor, but prevents
# run() from being called on it again.
shell.mainloop()
# You must exit IPython to terminate your program.
print 'Goodbye!'