diff --git a/IPython/frontend/qt/console/frontend_widget.py b/IPython/frontend/qt/console/frontend_widget.py index 3afcfa1..1119ef8 100644 --- a/IPython/frontend/qt/console/frontend_widget.py +++ b/IPython/frontend/qt/console/frontend_widget.py @@ -179,10 +179,12 @@ class FrontendWidget(HistoryConsoleWidget): # Disconnect the old kernel manager's channels. sub = self._kernel_manager.sub_channel xreq = self._kernel_manager.xreq_channel + rep = self._kernel_manager.rep_channel sub.message_received.disconnect(self._handle_sub) xreq.execute_reply.disconnect(self._handle_execute_reply) xreq.complete_reply.disconnect(self._handle_complete_reply) xreq.object_info_reply.disconnect(self._handle_object_info_reply) + rep.readline_requested.disconnect(self._handle_req) # Handle the case where the old kernel manager is still listening. if self._kernel_manager.channels_running: @@ -200,10 +202,12 @@ class FrontendWidget(HistoryConsoleWidget): # Connect the new kernel manager's channels. sub = kernel_manager.sub_channel xreq = kernel_manager.xreq_channel + rep = kernel_manager.rep_channel sub.message_received.connect(self._handle_sub) xreq.execute_reply.connect(self._handle_execute_reply) xreq.complete_reply.connect(self._handle_complete_reply) xreq.object_info_reply.connect(self._handle_object_info_reply) + rep.readline_requested.connect(self._handle_req) # Handle the case where the kernel manager started channels before # we connected. @@ -292,10 +296,9 @@ class FrontendWidget(HistoryConsoleWidget): if position == self.textCursor().position(): self._call_tip() - def _handle_req(self): + def _handle_req(self, req): def callback(line): - print repr(line) - self._show_prompt() + self.kernel_manager.rep_channel.readline(line) self._readline(callback=callback) def _handle_sub(self, omsg): diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py index de05231..f2e21ea 100644 --- a/IPython/frontend/qt/kernelmanager.py +++ b/IPython/frontend/qt/kernelmanager.py @@ -95,6 +95,12 @@ class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject): class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): + # Emitted when any message is received. + message_received = QtCore.pyqtSignal(object) + + # Emitted when a readline request is received. + readline_requested = QtCore.pyqtSignal(object) + #--------------------------------------------------------------------------- # 'object' interface #--------------------------------------------------------------------------- @@ -105,6 +111,21 @@ class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): QtCore.QObject.__init__(self) RepSocketChannel.__init__(self, *args, **kw) + #--------------------------------------------------------------------------- + # 'RepSocketChannel' interface + #--------------------------------------------------------------------------- + + def call_handlers(self, msg): + """ Reimplemented to emit signals instead of making callbacks. + """ + # Emit the generic signal. + self.message_received.emit(msg) + + # Emit signals for specialized message types. + msg_type = msg['msg_type'] + if msg_type == 'readline_request': + self.readline_requested.emit(msg) + class QtKernelManager(KernelManager, QtCore.QObject): """ A KernelManager that provides signals and slots. diff --git a/IPython/zmq/kernel.py b/IPython/zmq/kernel.py index 1365e4c..17399ae 100755 --- a/IPython/zmq/kernel.py +++ b/IPython/zmq/kernel.py @@ -55,9 +55,10 @@ class InStream(object): if self.socket is None: raise ValueError(u'I/O operation on closed file') else: - content = { u'size' : unicode(size) } - msg = self.session.msg(u'readline', content=content) - return self._request(msg) + content = dict(size=size) + msg = self.session.msg('readline_request', content=content) + reply = self._request(msg) + return reply['content']['line'] def readlines(self, size=-1): raise NotImplementedError @@ -83,7 +84,7 @@ class InStream(object): raise else: break - return reply[u'content'][u'data'] + return reply class OutStream(object): diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 1941123..166147d 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -199,7 +199,6 @@ class XReqSocketChannel(ZmqSocketChannel): Returns ------- The msg_id of the message sent. - """ content = dict(text=text, line=line) msg = self.session.msg('complete_request', content) @@ -338,24 +337,84 @@ class SubSocketChannel(ZmqSocketChannel): class RepSocketChannel(ZmqSocketChannel): """A reply channel to handle raw_input requests that the kernel makes.""" + msg_queue = None + + def __init__(self, context, session, address): + self.msg_queue = Queue() + super(RepSocketChannel, self).__init__(context, session, address) + def run(self): """The thread's main activity. Call start() instead.""" + self.socket = self.context.socket(zmq.XREQ) + self.socket.setsockopt(zmq.IDENTITY, self.session.session) + self.socket.connect('tcp://%s:%i' % self.address) self.ioloop = ioloop.IOLoop() + self.iostate = POLLERR|POLLIN + self.ioloop.add_handler(self.socket, self._handle_events, + self.iostate) self.ioloop.start() def stop(self): self.ioloop.stop() super(RepSocketChannel, self).stop() - def on_raw_input(self): - pass + def call_handlers(self, msg): + """This method is called in the ioloop thread when a message arrives. + + Subclasses should override this method to handle incoming messages. + It is important to remember that this method is called in the thread + so that some logic must be done to ensure that the application leve + handlers are called in the application thread. + """ + raise NotImplementedError('call_handlers must be defined in a subclass.') + + def readline(self, line): + """A send a line of raw input to the kernel. + + Parameters + ---------- + line : str + The line of the input. + """ + content = dict(line=line) + msg = self.session.msg('readline_reply', content) + self._queue_reply(msg) + + def _handle_events(self, socket, events): + if events & POLLERR: + self._handle_err() + if events & POLLOUT: + self._handle_send() + if events & POLLIN: + self._handle_recv() + + def _handle_recv(self): + msg = self.socket.recv_json() + self.call_handlers(msg) + + def _handle_send(self): + try: + msg = self.msg_queue.get(False) + except Empty: + pass + else: + self.socket.send_json(msg) + if self.msg_queue.empty(): + self.drop_io_state(POLLOUT) + + def _handle_err(self): + # We don't want to let this go silently, so eventually we should log. + raise zmq.ZMQError() + + def _queue_reply(self, msg): + self.msg_queue.put(msg) + self.add_io_state(POLLOUT) #----------------------------------------------------------------------------- # Main kernel manager class #----------------------------------------------------------------------------- - class KernelManager(HasTraits): """ Manages a kernel for a frontend.