From 710f79b2759eefd326ae69e5617522b2504502f4 2010-07-14 17:41:21 From: epatters Date: 2010-07-14 17:41:21 Subject: [PATCH] * Refactored KernelManager to use Traitlets and to have its channels as attributes * QtSubChannel and QtXReqChannel work now * Progress on refactoring FrontendWidget to use KernelManager. --- diff --git a/IPython/frontend/qt/console/frontend_widget.py b/IPython/frontend/qt/console/frontend_widget.py index 6ee43f5..60865ed 100644 --- a/IPython/frontend/qt/console/frontend_widget.py +++ b/IPython/frontend/qt/console/frontend_widget.py @@ -9,9 +9,6 @@ from pygments.lexers import PythonLexer from PyQt4 import QtCore, QtGui import zmq -# IPython imports. -from IPython.zmq.session import Message, Session - # Local imports from call_tip_widget import CallTipWidget from completion_lexer import CompletionLexer @@ -19,48 +16,6 @@ from console_widget import HistoryConsoleWidget from pygments_highlighter import PygmentsHighlighter -class FrontendReplyThread(Thread, QtCore.QObject): - """ A Thread that receives a reply from the kernel for the frontend. - """ - - finished = QtCore.pyqtSignal() - output_received = QtCore.pyqtSignal(Message) - reply_received = QtCore.pyqtSignal(Message) - - def __init__(self, parent): - """ Create a FrontendReplyThread for the specified frontend. - """ - assert isinstance(parent, FrontendWidget) - QtCore.QObject.__init__(self, parent) - Thread.__init__(self) - - self.sleep_time = 0.05 - - def run(self): - """ The starting point for the thread. - """ - frontend = self.parent() - while True: - rep = frontend._recv_reply() - if rep is not None: - self._recv_output() - self.reply_received.emit(rep) - break - - self._recv_output() - time.sleep(self.sleep_time) - - self.finished.emit() - - def _recv_output(self): - """ Send any output to the frontend. - """ - frontend = self.parent() - omsgs = frontend._recv_output() - for omsg in omsgs: - self.output_received.emit(omsg) - - class FrontendHighlighter(PygmentsHighlighter): """ A Python PygmentsHighlighter that can be turned on and off and which knows about continuation prompts. @@ -96,7 +51,7 @@ class FrontendWidget(HistoryConsoleWidget): """ # Emitted when an 'execute_reply' is received from the kernel. - executed = QtCore.pyqtSignal(Message) + executed = QtCore.pyqtSignal(object) #--------------------------------------------------------------------------- # 'QWidget' interface @@ -109,6 +64,7 @@ class FrontendWidget(HistoryConsoleWidget): self._compile = CommandCompiler() self._completion_lexer = CompletionLexer(PythonLexer()) self._highlighter = FrontendHighlighter(self) + self._kernel_manager = None self.document().contentsChange.connect(self._document_contents_change) @@ -183,14 +139,7 @@ class FrontendWidget(HistoryConsoleWidget): executed = code is not None if executed: - msg = self.session.send(self.request_socket, 'execute_request', - dict(code=source)) - thread = FrontendReplyThread(self) - if not hidden: - thread.output_received.connect(self._handle_output) - thread.reply_received.connect(self._handle_reply) - thread.finished.connect(thread.deleteLater) - thread.start() + self.kernel_manager.xreq_channel.execute(source) else: space = 0 for char in lines[-1]: @@ -221,9 +170,25 @@ class FrontendWidget(HistoryConsoleWidget): def _set_kernel_manager(self, kernel_manager): """ Sets a new kernel manager, configuring its channels as necessary. """ + # Disconnect the old kernel manager. + if self._kernel_manager is not None: + sub = self._kernel_manager.sub_channel + xreq = self._kernel_manager.xreq_channel + sub.message_received.disconnect(self._handle_sub) + xreq.execute_reply.disconnect(self._handle_execute_reply) + xreq.complete_reply.disconnect(self._handle_complete_reply) + xreq.object_info_reply.disconnect(self._handle_object_info_reply) + + # Connect the new kernel manager. self._kernel_manager = kernel_manager - self._sub_channel = kernel_manager.get_sub_channel() - self._xreq_channel = kernel_manager.get_xreq_channel() + sub = kernel_manager.sub_channel + xreq = kernel_manager.xreq_channel + sub.message_received.connect(self._handle_sub) + xreq.execute_reply.connect(self._handle_execute_reply) + #xreq.complete_reply.connect(self._handle_complete_reply) + #xreq.object_info_repy.connect(self._handle_object_info_reply) + + self._show_prompt('>>> ') kernel_manager = property(_get_kernel_manager, _set_kernel_manager) @@ -282,11 +247,6 @@ class FrontendWidget(HistoryConsoleWidget): self._complete_with_items(cursor, matches) return True - def _kernel_connected(self): - """ Called when the frontend is connected to a kernel. - """ - self._show_prompt('>>> ') - def _get_context(self, cursor=None): """ Gets the context at the current cursor location. """ @@ -310,46 +270,32 @@ class FrontendWidget(HistoryConsoleWidget): if position == self.textCursor().position(): self._call_tip() - def _handle_output(self, omsg): - handler = getattr(self, '_handle_%s' % omsg.msg_type, None) + def _handle_sub(self, omsg): + handler = getattr(self, '_handle_%s' % omsg['msg_type'], None) if handler is not None: handler(omsg) def _handle_pyout(self, omsg): - if omsg.parent_header.session == self.session.session: - self.appendPlainText(omsg.content.data + '\n') + session = omsg['parent_header']['session'] + if session == self.kernel_manager.session.session: + self.appendPlainText(omsg['content']['data'] + '\n') def _handle_stream(self, omsg): - self.appendPlainText(omsg.content.data) + self.appendPlainText(omsg['content']['data']) - def _handle_reply(self, rep): - if rep is not None: - if rep.msg_type == 'execute_reply': - if rep.content.status == 'error': - self.appendPlainText(rep.content.traceback[-1]) - elif rep.content.status == 'aborted': - text = "ERROR: ABORTED\n" - ab = self.messages[rep.parent_header.msg_id].content - if 'code' in ab: - text += ab.code - else: - text += ab - self.appendPlainText(text) - self._show_prompt('>>> ') - self.executed.emit(rep) + def _handle_execute_reply(self, rep): + content = rep['content'] + status = content['status'] + if status == 'error': + self.appendPlainText(content['traceback'][-1]) + elif status == 'aborted': + text = "ERROR: ABORTED\n" + self.appendPlainText(text) + self._show_prompt('>>> ') + self.executed.emit(rep) #------ Communication methods ---------------------------------------------- - def _recv_output(self): - omsgs = [] - while True: - omsg = self.session.recv(self.sub_socket) - if omsg is None: - break - else: - omsgs.append(omsg) - return omsgs - def _recv_reply(self): return self.session.recv(self.request_socket) @@ -364,26 +310,19 @@ class FrontendWidget(HistoryConsoleWidget): if __name__ == '__main__': import sys + from IPython.frontend.qt.kernelmanager import QtKernelManager - # Defaults - ip = '127.0.0.1' - port_base = 5555 - connection = ('tcp://%s' % ip) + ':%i' - req_conn = connection % port_base - sub_conn = connection % (port_base+1) - - # Create initial sockets - c = zmq.Context() - request_socket = c.socket(zmq.XREQ) - request_socket.connect(req_conn) - sub_socket = c.socket(zmq.SUB) - sub_socket.connect(sub_conn) - sub_socket.setsockopt(zmq.SUBSCRIBE, '') + # Create KernelManager + xreq_addr = ('127.0.0.1', 5575) + sub_addr = ('127.0.0.1', 5576) + rep_addr = ('127.0.0.1', 5577) + kernel_manager = QtKernelManager(xreq_addr, sub_addr, rep_addr) + kernel_manager.sub_channel.start() + kernel_manager.xreq_channel.start() # Launch application app = QtGui.QApplication(sys.argv) - widget = FrontendWidget(request_socket=request_socket, - sub_socket=sub_socket) + widget = FrontendWidget(kernel_manager) widget.setWindowTitle('Python') widget.resize(640, 480) widget.show() diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py index 8868f55..637bd02 100644 --- a/IPython/frontend/qt/kernelmanager.py +++ b/IPython/frontend/qt/kernelmanager.py @@ -5,29 +5,30 @@ from PyQt4 import QtCore # IPython imports. -from IPython.zmq.kernel_manager import KernelManager, SubSocketChannel, \ +from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \ XReqSocketChannel, RepSocketChannel -class QtKernelManager(KernelManager): - """ A KernelManager that provides channels that use signals and slots. - """ - - sub_channel_class = QtSubSocketChannel - xreq_channel_class = QtXReqSocketChannel - rep_channel_class = QtRepSocketChannel - - class QtSubSocketChannel(SubSocketChannel, QtCore.QObject): # Emitted when any message is received. - message_received = QtCore.pyqtSignal(dict) + message_received = QtCore.pyqtSignal(object) # Emitted when a message of type 'pyout' or 'stdout' is received. - output_received = QtCore.pyqtSignal(dict) + output_received = QtCore.pyqtSignal(object) # Emitted when a message of type 'pyerr' or 'stderr' is received. - error_received = QtCore.pyqtSignal(dict) + error_received = QtCore.pyqtSignal(object) + + #--------------------------------------------------------------------------- + # 'object' interface + #--------------------------------------------------------------------------- + + def __init__(self, *args, **kw): + """ Reimplemented to ensure that QtCore.QObject is initialized first. + """ + QtCore.QObject.__init__(self) + SubSocketChannel.__init__(self, *args, **kw) #--------------------------------------------------------------------------- # 'SubSocketChannel' interface @@ -50,12 +51,22 @@ class QtSubSocketChannel(SubSocketChannel, QtCore.QObject): class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject): # Emitted when any message is received. - message_received = QtCore.pyqtSignal(dict) + message_received = QtCore.pyqtSignal(object) # Emitted when a reply has been received for the corresponding request type. - execute_reply = QtCore.pyqtSignal(dict) - complete_reply = QtCore.pyqtSignal(dict) - object_info_reply = QtCore.pyqtSignal(dict) + execute_reply = QtCore.pyqtSignal(object) + complete_reply = QtCore.pyqtSignal(object) + object_info_reply = QtCore.pyqtSignal(object) + + #--------------------------------------------------------------------------- + # 'object' interface + #--------------------------------------------------------------------------- + + def __init__(self, *args, **kw): + """ Reimplemented to ensure that QtCore.QObject is initialized first. + """ + QtCore.QObject.__init__(self) + XReqSocketChannel.__init__(self, *args, **kw) #--------------------------------------------------------------------------- # 'XReqSocketChannel' interface @@ -73,7 +84,29 @@ class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject): if signal: signal.emit(msg) + def _queue_request(self, msg, callback): + """ Reimplemented to skip callback handling. + """ + self.command_queue.put(msg) + class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): - pass + #--------------------------------------------------------------------------- + # 'object' interface + #--------------------------------------------------------------------------- + + def __init__(self, *args, **kw): + """ Reimplemented to ensure that QtCore.QObject is initialized first. + """ + QtCore.QObject.__init__(self) + RepSocketChannel.__init__(self, *args, **kw) + + +class QtKernelManager(KernelManager): + """ A KernelManager that provides channels that use signals and slots. + """ + + sub_channel_class = QtSubSocketChannel + xreq_channel_class = QtXReqSocketChannel + rep_channel_class = QtRepSocketChannel diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 2308c92..68d893a 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -4,13 +4,18 @@ TODO: Create logger to handle debugging and console messages. """ +# Standard library imports. from Queue import Queue, Empty from threading import Thread import traceback +# System library imports. import zmq from zmq import POLLIN, POLLOUT, POLLERR from zmq.eventloop import ioloop + +# Local imports. +from IPython.utils.traitlets import HasTraits, Any, Int, Instance, Str, Type from session import Session @@ -18,56 +23,6 @@ class MissingHandlerError(Exception): pass -class KernelManager(object): - - sub_channel_class = SubSocketChannel - xreq_channel_class = XReqSocketChannel - rep_channel_class = RepSocketChannel - - def __init__(self, xreq_addr, sub_addr, rep_addr, - context=None, session=None): - self.context = zmq.Context() if context is None else context - self.session = Session() if session is None else session - self.xreq_addr = xreq_addr - self.sub_addr = sub_addr - self.rep_addr = rep_addr - - def start_kernel(self): - """Start a localhost kernel on ip and port. - - The SUB channel is for the frontend to receive messages published by - the kernel. - - The REQ channel is for the frontend to make requests of the kernel. - - The REP channel is for the kernel to request stdin (raw_input) from - the frontend. - """ - - def kill_kernel(self): - """Kill the running kernel""" - - def is_alive(self): - """Is the kernel alive?""" - return True - - def signal_kernel(self, signum): - """Send signum to the kernel.""" - - def get_sub_channel(self): - """Get the SUB socket channel object.""" - return self.sub_channel_class(self.context, self.session, self.sub_addr) - - def get_xreq_channel(self): - """Get the REQ socket channel object to make requests of the kernel.""" - return self.xreq_channel_class(self.context, self.session, - self.xreq_addr) - - def get_rep_channel(self): - """Get the REP socket channel object to handle stdin (raw_input).""" - return self.rep_channel_class(self.context, self.session, self.rep_addr) - - class ZmqSocketChannel(Thread): socket = None @@ -231,7 +186,8 @@ class XReqSocketChannel(ZmqSocketChannel): return callback handler = self.handlers.get(name) if handler is None: - raise MissingHandlerError('No handler defined for method: %s' % name) + raise MissingHandlerError( + 'No handler defined for method: %s' % name) return handler def override_call_handler(self, func): @@ -273,3 +229,90 @@ class RepSocketChannel(ZmqSocketChannel): def on_raw_input(): pass + + +class KernelManager(HasTraits): + + # The addresses to use for the various channels. Should be tuples of form + # (ip_address, port). + sub_address = Any + xreq_address = Any + rep_address = Any + # FIXME: Add Tuple to Traitlets. + #sub_address = Tuple(Str, Int) + #xreq_address = Tuple(Str, Int) + #rep_address = Tuple(Str, Int) + + # The PyZMQ Context to use for communication with the kernel. + context = Instance(zmq.Context, ()) + + # The Session to use for communication with the kernel. + session = Instance(Session, ()) + + # The classes to use for the various channels. + sub_channel_class = Type(SubSocketChannel) + xreq_channel_class = Type(XReqSocketChannel) + rep_channel_class = Type(RepSocketChannel) + + # Protected traits. + _sub_channel = Any + _xreq_channel = Any + _rep_channel = Any + + def __init__(self, xreq_address, sub_address, rep_address, **traits): + super(KernelManager, self).__init__() + + self.xreq_address = xreq_address + self.sub_address = sub_address + self.rep_address = rep_address + + # FIXME: This should be the business of HasTraits. The convention is: + # HasTraits.__init__(self, **traits_to_be_initialized.) + for trait in traits: + setattr(self, trait, traits[trait]) + + def start_kernel(self): + """Start a localhost kernel on ip and port. + + The SUB channel is for the frontend to receive messages published by + the kernel. + + The REQ channel is for the frontend to make requests of the kernel. + + The REP channel is for the kernel to request stdin (raw_input) from + the frontend. + """ + + def kill_kernel(self): + """Kill the running kernel""" + + def is_alive(self): + """Is the kernel alive?""" + return True + + def signal_kernel(self, signum): + """Send signum to the kernel.""" + + @property + def sub_channel(self): + """Get the SUB socket channel object.""" + if self._sub_channel is None: + self._sub_channel = self.sub_channel_class( + self.context, self.session, self.sub_address) + return self._sub_channel + + @property + def xreq_channel(self): + """Get the REQ socket channel object to make requests of the kernel.""" + if self._xreq_channel is None: + self._xreq_channel = self.xreq_channel_class( + self.context, self.session, self.xreq_address) + return self._xreq_channel + + @property + def rep_channel(self): + """Get the REP socket channel object to handle stdin (raw_input).""" + if self._rep_channel is None: + self._rep_channel = self.rep_channel_class( + self.context, self.session, self.rep_address) + return self._rep_channel diff --git a/IPython/zmq/ktest.py b/IPython/zmq/ktest.py index d4e6a24..592f935 100644 --- a/IPython/zmq/ktest.py +++ b/IPython/zmq/ktest.py @@ -9,8 +9,8 @@ rep_addr = ('127.0.0.1', 5577) km = KernelManager(xreq_addr, sub_addr, rep_addr) -# xreq_channel = km.get_xreq_channel() -sub_channel = km.get_sub_channel() +# xreq_channel = km.xreq_channel +sub_channel = km.sub_channel # xreq_channel.start() sub_channel.start() @@ -49,4 +49,4 @@ for i in range(100): time.sleep(1) # xreq_channel.join() -sub_channel.join() \ No newline at end of file +sub_channel.join()