# Standard library imports import signal import sys # System library imports from pygments.lexers import PythonLexer from PyQt4 import QtCore, QtGui # Local imports from IPython.core.inputsplitter import InputSplitter from IPython.frontend.qt.base_frontend_mixin import BaseFrontendMixin from IPython.utils.traitlets import Bool from bracket_matcher import BracketMatcher from call_tip_widget import CallTipWidget from completion_lexer import CompletionLexer from console_widget import HistoryConsoleWidget from pygments_highlighter import PygmentsHighlighter class FrontendHighlighter(PygmentsHighlighter): """ A PygmentsHighlighter that can be turned on and off and that ignores prompts. """ def __init__(self, frontend): super(FrontendHighlighter, self).__init__(frontend._control.document()) self._current_offset = 0 self._frontend = frontend self.highlighting_on = False def highlightBlock(self, qstring): """ Highlight a block of text. Reimplemented to highlight selectively. """ if not self.highlighting_on: return # The input to this function is unicode string that may contain # paragraph break characters, non-breaking spaces, etc. Here we acquire # the string as plain text so we can compare it. current_block = self.currentBlock() string = self._frontend._get_block_plain_text(current_block) # Decide whether to check for the regular or continuation prompt. if current_block.contains(self._frontend._prompt_pos): prompt = self._frontend._prompt else: prompt = self._frontend._continuation_prompt # Don't highlight the part of the string that contains the prompt. if string.startswith(prompt): self._current_offset = len(prompt) qstring.remove(0, len(prompt)) else: self._current_offset = 0 PygmentsHighlighter.highlightBlock(self, qstring) def rehighlightBlock(self, block): """ Reimplemented to temporarily enable highlighting if disabled. """ old = self.highlighting_on self.highlighting_on = True super(FrontendHighlighter, self).rehighlightBlock(block) self.highlighting_on = old def setFormat(self, start, count, format): """ Reimplemented to highlight selectively. """ start += self._current_offset PygmentsHighlighter.setFormat(self, start, count, format) class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): """ A Qt frontend for a generic Python kernel. """ # An option and corresponding signal for overriding the default kernel # interrupt behavior. custom_interrupt = Bool(False) custom_interrupt_requested = QtCore.pyqtSignal() # An option and corresponding signals for overriding the default kernel # restart behavior. custom_restart = Bool(False) custom_restart_kernel_died = QtCore.pyqtSignal(float) custom_restart_requested = QtCore.pyqtSignal() # Emitted when an 'execute_reply' has been received from the kernel and # processed by the FrontendWidget. executed = QtCore.pyqtSignal(object) # Protected class variables. _input_splitter_class = InputSplitter #--------------------------------------------------------------------------- # 'object' interface #--------------------------------------------------------------------------- def __init__(self, *args, **kw): super(FrontendWidget, self).__init__(*args, **kw) # FrontendWidget protected variables. self._bracket_matcher = BracketMatcher(self._control) self._call_tip_widget = CallTipWidget(self._control) self._completion_lexer = CompletionLexer(PythonLexer()) self._hidden = False self._highlighter = FrontendHighlighter(self) self._input_splitter = self._input_splitter_class(input_mode='block') self._kernel_manager = None self._possible_kernel_restart = False # Configure the ConsoleWidget. self.tab_width = 4 self._set_continuation_prompt('... ') # Connect signal handlers. document = self._control.document() document.contentsChange.connect(self._document_contents_change) #--------------------------------------------------------------------------- # 'ConsoleWidget' abstract interface #--------------------------------------------------------------------------- def _is_complete(self, source, interactive): """ Returns whether 'source' can be completely processed and a new prompt created. When triggered by an Enter/Return key press, 'interactive' is True; otherwise, it is False. """ complete = self._input_splitter.push(source.expandtabs(4)) if interactive: complete = not self._input_splitter.push_accepts_more() return complete def _execute(self, source, hidden, user_variables=None, user_expressions=None): """ Execute 'source'. If 'hidden', do not show any output. See parent class :meth:`execute` docstring for full details. """ # tmp code for testing, disable in real use with 'if 0'. Only delete # this code once we have automated tests for these fields. if 0: user_variables = ['x', 'y', 'z'] user_expressions = {'sum' : '1+1', 'bad syntax' : 'klsdafj kasd f', 'bad call' : 'range("hi")', 'time' : 'time.time()', } # /end tmp code # FIXME - user_variables/expressions are not visible in API above us. self.kernel_manager.xreq_channel.execute(source, hidden, user_variables, user_expressions) self._hidden = hidden def _prompt_started_hook(self): """ Called immediately after a new prompt is displayed. """ if not self._reading: self._highlighter.highlighting_on = True def _prompt_finished_hook(self): """ Called immediately after a prompt is finished, i.e. when some input will be processed and a new prompt displayed. """ if not self._reading: self._highlighter.highlighting_on = False def _tab_pressed(self): """ Called when the tab key is pressed. Returns whether to continue processing the event. """ # Perform tab completion if: # 1) The cursor is in the input buffer. # 2) There is a non-whitespace character before the cursor. text = self._get_input_buffer_cursor_line() if text is None: return False complete = bool(text[:self._get_input_buffer_cursor_column()].strip()) if complete: self._complete() return not complete #--------------------------------------------------------------------------- # 'ConsoleWidget' protected interface #--------------------------------------------------------------------------- def _event_filter_console_keypress(self, event): """ Reimplemented to allow execution interruption. """ key = event.key() if self._control_key_down(event.modifiers()): if key == QtCore.Qt.Key_C and self._executing: self.interrupt_kernel() return True elif key == QtCore.Qt.Key_Period: message = 'Are you sure you want to restart the kernel?' self.restart_kernel(message) return True return super(FrontendWidget, self)._event_filter_console_keypress(event) def _insert_continuation_prompt(self, cursor): """ Reimplemented for auto-indentation. """ super(FrontendWidget, self)._insert_continuation_prompt(cursor) spaces = self._input_splitter.indent_spaces cursor.insertText('\t' * (spaces / self.tab_width)) cursor.insertText(' ' * (spaces % self.tab_width)) #--------------------------------------------------------------------------- # 'BaseFrontendMixin' abstract interface #--------------------------------------------------------------------------- def _handle_complete_reply(self, rep): """ Handle replies for tab completion. """ cursor = self._get_cursor() if rep['parent_header']['msg_id'] == self._complete_id and \ cursor.position() == self._complete_pos: text = '.'.join(self._get_context()) cursor.movePosition(QtGui.QTextCursor.Left, n=len(text)) self._complete_with_items(cursor, rep['content']['matches']) def _handle_execute_reply(self, msg): """ Handles replies for code execution. """ if not self._hidden: # Make sure that all output from the SUB channel has been processed # before writing a new prompt. self.kernel_manager.sub_channel.flush() content = msg['content'] status = content['status'] if status == 'ok': self._process_execute_ok(msg) elif status == 'error': self._process_execute_error(msg) elif status == 'abort': self._process_execute_abort(msg) self._show_interpreter_prompt_for_reply(msg) self.executed.emit(msg) def _handle_input_request(self, msg): """ Handle requests for raw_input. """ if self._hidden: raise RuntimeError('Request for raw input during hidden execution.') # Make sure that all output from the SUB channel has been processed # before entering readline mode. self.kernel_manager.sub_channel.flush() def callback(line): self.kernel_manager.rep_channel.input(line) self._readline(msg['content']['prompt'], callback=callback) def _handle_kernel_died(self, since_last_heartbeat): """ Handle the kernel's death by asking if the user wants to restart. """ message = 'The kernel heartbeat has been inactive for %.2f ' \ 'seconds. Do you want to restart the kernel? You may ' \ 'first want to check the network connection.' % \ since_last_heartbeat if self.custom_restart: self.custom_restart_kernel_died.emit(since_last_heartbeat) else: self.restart_kernel(message) def _handle_object_info_reply(self, rep): """ Handle replies for call tips. """ cursor = self._get_cursor() if rep['parent_header']['msg_id'] == self._call_tip_id and \ cursor.position() == self._call_tip_pos: doc = rep['content']['docstring'] if doc: self._call_tip_widget.show_docstring(doc) def _handle_pyout(self, msg): """ Handle display hook output. """ if not self._hidden and self._is_from_this_session(msg): self._append_plain_text(msg['content']['data'] + '\n') def _handle_stream(self, msg): """ Handle stdout, stderr, and stdin. """ if not self._hidden and self._is_from_this_session(msg): self._append_plain_text(msg['content']['data']) self._control.moveCursor(QtGui.QTextCursor.End) def _started_channels(self): """ Called when the KernelManager channels have started listening or when the frontend is assigned an already listening KernelManager. """ self._control.clear() self._append_plain_text(self._get_banner()) self._show_interpreter_prompt() def _stopped_channels(self): """ Called when the KernelManager channels have stopped listening or when a listening KernelManager is removed from the frontend. """ self._executing = self._reading = False self._highlighter.highlighting_on = False #--------------------------------------------------------------------------- # 'FrontendWidget' interface #--------------------------------------------------------------------------- def execute_file(self, path, hidden=False): """ Attempts to execute file with 'path'. If 'hidden', no output is shown. """ self.execute('execfile("%s")' % path, hidden=hidden) def interrupt_kernel(self): """ Attempts to interrupt the running kernel. """ if self.custom_interrupt: self.custom_interrupt_requested.emit() elif self.kernel_manager.has_kernel: self.kernel_manager.signal_kernel(signal.SIGINT) else: self._append_plain_text('Kernel process is either remote or ' 'unspecified. Cannot interrupt.\n') def restart_kernel(self, message): """ Attempts to restart the running kernel. """ # We want to make sure that if this dialog is already happening, that # other signals don't trigger it again. This can happen when the # kernel_died heartbeat signal is emitted and the user is slow to # respond to the dialog. if not self._possible_kernel_restart: if self.custom_restart: self.custom_restart_requested.emit() elif self.kernel_manager.has_kernel: # Setting this to True will prevent this logic from happening # again until the current pass is completed. self._possible_kernel_restart = True buttons = QtGui.QMessageBox.Yes | QtGui.QMessageBox.No result = QtGui.QMessageBox.question(self, 'Restart kernel?', message, buttons) if result == QtGui.QMessageBox.Yes: try: self.kernel_manager.restart_kernel() except RuntimeError: message = 'Kernel started externally. Cannot restart.\n' self._append_plain_text(message) else: self._stopped_channels() self._append_plain_text('Kernel restarting...\n') self._show_interpreter_prompt() # This might need to be moved to another location? self._possible_kernel_restart = False else: self._append_plain_text('Kernel process is either remote or ' 'unspecified. Cannot restart.\n') #--------------------------------------------------------------------------- # 'FrontendWidget' protected interface #--------------------------------------------------------------------------- def _call_tip(self): """ Shows a call tip, if appropriate, at the current cursor location. """ # Decide if it makes sense to show a call tip cursor = self._get_cursor() cursor.movePosition(QtGui.QTextCursor.Left) if cursor.document().characterAt(cursor.position()).toAscii() != '(': return False context = self._get_context(cursor) if not context: return False # Send the metadata request to the kernel name = '.'.join(context) self._call_tip_id = self.kernel_manager.xreq_channel.object_info(name) self._call_tip_pos = self._get_cursor().position() return True def _complete(self): """ Performs completion at the current cursor location. """ context = self._get_context() if context: # Send the completion request to the kernel self._complete_id = self.kernel_manager.xreq_channel.complete( '.'.join(context), # text self._get_input_buffer_cursor_line(), # line self._get_input_buffer_cursor_column(), # cursor_pos self.input_buffer) # block self._complete_pos = self._get_cursor().position() def _get_banner(self): """ Gets a banner to display at the beginning of a session. """ banner = 'Python %s on %s\nType "help", "copyright", "credits" or ' \ '"license" for more information.' return banner % (sys.version, sys.platform) def _get_context(self, cursor=None): """ Gets the context for the specified cursor (or the current cursor if none is specified). """ if cursor is None: cursor = self._get_cursor() cursor.movePosition(QtGui.QTextCursor.StartOfBlock, QtGui.QTextCursor.KeepAnchor) text = str(cursor.selection().toPlainText()) return self._completion_lexer.get_context(text) def _process_execute_abort(self, msg): """ Process a reply for an aborted execution request. """ self._append_plain_text("ERROR: execution aborted\n") def _process_execute_error(self, msg): """ Process a reply for an execution request that resulted in an error. """ content = msg['content'] traceback = ''.join(content['traceback']) self._append_plain_text(traceback) def _process_execute_ok(self, msg): """ Process a reply for a successful execution equest. """ payload = msg['content']['payload'] for item in payload: if not self._process_execute_payload(item): warning = 'Received unknown payload of type %s\n' self._append_plain_text(warning % repr(item['source'])) def _process_execute_payload(self, item): """ Process a single payload item from the list of payload items in an execution reply. Returns whether the payload was handled. """ # The basic FrontendWidget doesn't handle payloads, as they are a # mechanism for going beyond the standard Python interpreter model. return False def _show_interpreter_prompt(self): """ Shows a prompt for the interpreter. """ self._show_prompt('>>> ') def _show_interpreter_prompt_for_reply(self, msg): """ Shows a prompt for the interpreter given an 'execute_reply' message. """ self._show_interpreter_prompt() #------ Signal handlers ---------------------------------------------------- def _document_contents_change(self, position, removed, added): """ Called whenever the document's content changes. Display a call tip if appropriate. """ # Calculate where the cursor should be *after* the change: position += added document = self._control.document() if position == self._get_cursor().position(): self._call_tip()