##// END OF EJS Templates
Initial checkin of Qt kernel manager. Began refactor of FrontendWidget.
Initial checkin of Qt kernel manager. Began refactor of FrontendWidget.

File last commit:

r2609:083532d0
r2609:083532d0
Show More
frontend_widget.py
391 lines | 13.3 KiB | text/x-python | PythonLexer
# Standard library imports
from codeop import CommandCompiler
from threading import Thread
import time
import types
# System library imports
from pygments.lexers import PythonLexer
from PyQt4 import QtCore, QtGui
import zmq
# IPython imports.
from IPython.zmq.session import Message, Session
# Local imports
from call_tip_widget import CallTipWidget
from completion_lexer import CompletionLexer
from console_widget import HistoryConsoleWidget
from pygments_highlighter import PygmentsHighlighter
class FrontendReplyThread(Thread, QtCore.QObject):
""" A Thread that receives a reply from the kernel for the frontend.
"""
finished = QtCore.pyqtSignal()
output_received = QtCore.pyqtSignal(Message)
reply_received = QtCore.pyqtSignal(Message)
def __init__(self, parent):
""" Create a FrontendReplyThread for the specified frontend.
"""
assert isinstance(parent, FrontendWidget)
QtCore.QObject.__init__(self, parent)
Thread.__init__(self)
self.sleep_time = 0.05
def run(self):
""" The starting point for the thread.
"""
frontend = self.parent()
while True:
rep = frontend._recv_reply()
if rep is not None:
self._recv_output()
self.reply_received.emit(rep)
break
self._recv_output()
time.sleep(self.sleep_time)
self.finished.emit()
def _recv_output(self):
""" Send any output to the frontend.
"""
frontend = self.parent()
omsgs = frontend._recv_output()
for omsg in omsgs:
self.output_received.emit(omsg)
class FrontendHighlighter(PygmentsHighlighter):
""" A Python PygmentsHighlighter that can be turned on and off and which
knows about continuation prompts.
"""
def __init__(self, frontend):
PygmentsHighlighter.__init__(self, frontend.document(), PythonLexer())
self._current_offset = 0
self._frontend = frontend
self.highlighting_on = False
def highlightBlock(self, qstring):
""" Highlight a block of text. Reimplemented to highlight selectively.
"""
if self.highlighting_on:
for prompt in (self._frontend._prompt,
self._frontend.continuation_prompt):
if qstring.startsWith(prompt):
qstring.remove(0, len(prompt))
self._current_offset = len(prompt)
break
PygmentsHighlighter.highlightBlock(self, qstring)
def setFormat(self, start, count, format):
""" Reimplemented to avoid highlighting continuation prompts.
"""
start += self._current_offset
PygmentsHighlighter.setFormat(self, start, count, format)
class FrontendWidget(HistoryConsoleWidget):
""" A Qt frontend for an IPython kernel.
"""
# Emitted when an 'execute_reply' is received from the kernel.
executed = QtCore.pyqtSignal(Message)
#---------------------------------------------------------------------------
# 'QWidget' interface
#---------------------------------------------------------------------------
def __init__(self, kernel_manager, parent=None):
super(FrontendWidget, self).__init__(parent)
self._call_tip_widget = CallTipWidget(self)
self._compile = CommandCompiler()
self._completion_lexer = CompletionLexer(PythonLexer())
self._highlighter = FrontendHighlighter(self)
self.document().contentsChange.connect(self._document_contents_change)
self.continuation_prompt = '... '
self.kernel_manager = kernel_manager
def focusOutEvent(self, event):
""" Reimplemented to hide calltips.
"""
self._call_tip_widget.hide()
return super(FrontendWidget, self).focusOutEvent(event)
def keyPressEvent(self, event):
""" Reimplemented to hide calltips.
"""
if event.key() == QtCore.Qt.Key_Escape:
self._call_tip_widget.hide()
return super(FrontendWidget, self).keyPressEvent(event)
#---------------------------------------------------------------------------
# 'ConsoleWidget' abstract interface
#---------------------------------------------------------------------------
def _execute(self, interactive):
""" Called to execute the input buffer. When triggered by an the enter
key press, 'interactive' is True; otherwise, it is False. Returns
whether the input buffer was completely processed and a new prompt
created.
"""
return self.execute_source(self.input_buffer, interactive=interactive)
def _prompt_started_hook(self):
""" Called immediately after a new prompt is displayed.
"""
self._highlighter.highlighting_on = True
def _prompt_finished_hook(self):
""" Called immediately after a prompt is finished, i.e. when some input
will be processed and a new prompt displayed.
"""
self._highlighter.highlighting_on = False
def _tab_pressed(self):
""" Called when the tab key is pressed. Returns whether to continue
processing the event.
"""
self._keep_cursor_in_buffer()
cursor = self.textCursor()
if not self._complete():
cursor.insertText(' ')
return False
#---------------------------------------------------------------------------
# 'FrontendWidget' interface
#---------------------------------------------------------------------------
def execute_source(self, source, hidden=False, interactive=False):
""" Execute a string containing Python code. If 'hidden', no output is
shown. Returns whether the source executed (i.e., returns True only
if no more input is necessary).
"""
try:
code = self._compile(source, symbol='single')
except (OverflowError, SyntaxError, ValueError):
# Just let IPython deal with the syntax error.
code = Exception
# Only execute interactive multiline input if it ends with a blank line
lines = source.splitlines()
if interactive and len(lines) > 1 and lines[-1].strip() != '':
code = None
executed = code is not None
if executed:
msg = self.session.send(self.request_socket, 'execute_request',
dict(code=source))
thread = FrontendReplyThread(self)
if not hidden:
thread.output_received.connect(self._handle_output)
thread.reply_received.connect(self._handle_reply)
thread.finished.connect(thread.deleteLater)
thread.start()
else:
space = 0
for char in lines[-1]:
if char == '\t':
space += 4
elif char == ' ':
space += 1
else:
break
if source.endswith(':') or source.endswith(':\n'):
space += 4
self._show_continuation_prompt()
self.appendPlainText(' ' * space)
return executed
def execute_file(self, path, hidden=False):
""" Attempts to execute file with 'path'. If 'hidden', no output is
shown.
"""
self.execute_source('run %s' % path, hidden=hidden)
def _get_kernel_manager(self):
""" Returns the current kernel manager.
"""
return self._kernel_manager
def _set_kernel_manager(self, kernel_manager):
""" Sets a new kernel manager, configuring its channels as necessary.
"""
self._kernel_manager = kernel_manager
self._sub_channel = kernel_manager.get_sub_channel()
self._xreq_channel = kernel_manager.get_xreq_channel()
kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
#---------------------------------------------------------------------------
# 'FrontendWidget' protected interface
#---------------------------------------------------------------------------
def _call_tip(self):
""" Shows a call tip, if appropriate, at the current cursor location.
"""
# Decide if it makes sense to show a call tip
cursor = self.textCursor()
cursor.movePosition(QtGui.QTextCursor.Left)
document = self.document()
if document.characterAt(cursor.position()).toAscii() != '(':
return False
context = self._get_context(cursor)
if not context:
return False
# Send the metadata request to the kernel
text = '.'.join(context)
msg = self.session.send(self.request_socket, 'metadata_request',
dict(context=text))
# Give the kernel some time to respond
rep = self._recv_reply_now('metadata_reply')
doc = rep.content.docstring if rep else ''
# Show the call tip
if doc:
self._call_tip_widget.show_tip(doc)
return True
def _complete(self):
""" Performs completion at the current cursor location.
"""
# Decide if it makes sense to do completion
context = self._get_context()
if not context:
return False
# Send the completion request to the kernel
text = '.'.join(context)
line = self.input_buffer_cursor_line
msg = self.session.send(self.request_socket, 'complete_request',
dict(text=text, line=line))
# Give the kernel some time to respond
rep = self._recv_reply_now('complete_reply')
matches = rep.content.matches if rep else []
# Show the completion at the correct location
cursor = self.textCursor()
cursor.movePosition(QtGui.QTextCursor.Left, n=len(text))
self._complete_with_items(cursor, matches)
return True
def _kernel_connected(self):
""" Called when the frontend is connected to a kernel.
"""
self._show_prompt('>>> ')
def _get_context(self, cursor=None):
""" Gets the context at the current cursor location.
"""
if cursor is None:
cursor = self.textCursor()
cursor.movePosition(QtGui.QTextCursor.StartOfLine,
QtGui.QTextCursor.KeepAnchor)
text = unicode(cursor.selectedText())
return self._completion_lexer.get_context(text)
#------ Signal handlers ----------------------------------------------------
def _document_contents_change(self, position, removed, added):
""" Called whenever the document's content changes. Display a calltip
if appropriate.
"""
# Calculate where the cursor should be *after* the change:
position += added
document = self.document()
if position == self.textCursor().position():
self._call_tip()
def _handle_output(self, omsg):
handler = getattr(self, '_handle_%s' % omsg.msg_type, None)
if handler is not None:
handler(omsg)
def _handle_pyout(self, omsg):
if omsg.parent_header.session == self.session.session:
self.appendPlainText(omsg.content.data + '\n')
def _handle_stream(self, omsg):
self.appendPlainText(omsg.content.data)
def _handle_reply(self, rep):
if rep is not None:
if rep.msg_type == 'execute_reply':
if rep.content.status == 'error':
self.appendPlainText(rep.content.traceback[-1])
elif rep.content.status == 'aborted':
text = "ERROR: ABORTED\n"
ab = self.messages[rep.parent_header.msg_id].content
if 'code' in ab:
text += ab.code
else:
text += ab
self.appendPlainText(text)
self._show_prompt('>>> ')
self.executed.emit(rep)
#------ Communication methods ----------------------------------------------
def _recv_output(self):
omsgs = []
while True:
omsg = self.session.recv(self.sub_socket)
if omsg is None:
break
else:
omsgs.append(omsg)
return omsgs
def _recv_reply(self):
return self.session.recv(self.request_socket)
def _recv_reply_now(self, msg_type):
for i in xrange(5):
rep = self._recv_reply()
if rep is not None and rep.msg_type == msg_type:
return rep
time.sleep(0.1)
return None
if __name__ == '__main__':
import sys
# Defaults
ip = '127.0.0.1'
port_base = 5555
connection = ('tcp://%s' % ip) + ':%i'
req_conn = connection % port_base
sub_conn = connection % (port_base+1)
# Create initial sockets
c = zmq.Context()
request_socket = c.socket(zmq.XREQ)
request_socket.connect(req_conn)
sub_socket = c.socket(zmq.SUB)
sub_socket.connect(sub_conn)
sub_socket.setsockopt(zmq.SUBSCRIBE, '')
# Launch application
app = QtGui.QApplication(sys.argv)
widget = FrontendWidget(request_socket=request_socket,
sub_socket=sub_socket)
widget.setWindowTitle('Python')
widget.resize(640, 480)
widget.show()
sys.exit(app.exec_())