From d93fea116476c11bd4e86983ff95b9c18a56cac7 2010-08-04 19:16:21
From: epatters <epatters@enthought.com>
Date: 2010-08-04 19:16:21
Subject: [PATCH] Basic raw_input implementation is now working.

---

diff --git a/IPython/frontend/qt/console/frontend_widget.py b/IPython/frontend/qt/console/frontend_widget.py
index 3afcfa1..1119ef8 100644
--- a/IPython/frontend/qt/console/frontend_widget.py
+++ b/IPython/frontend/qt/console/frontend_widget.py
@@ -179,10 +179,12 @@ class FrontendWidget(HistoryConsoleWidget):
             # Disconnect the old kernel manager's channels.
             sub = self._kernel_manager.sub_channel
             xreq = self._kernel_manager.xreq_channel
+            rep = self._kernel_manager.rep_channel
             sub.message_received.disconnect(self._handle_sub)
             xreq.execute_reply.disconnect(self._handle_execute_reply)
             xreq.complete_reply.disconnect(self._handle_complete_reply)
             xreq.object_info_reply.disconnect(self._handle_object_info_reply)
+            rep.readline_requested.disconnect(self._handle_req)
 
             # Handle the case where the old kernel manager is still listening.
             if self._kernel_manager.channels_running:
@@ -200,10 +202,12 @@ class FrontendWidget(HistoryConsoleWidget):
         # Connect the new kernel manager's channels.
         sub = kernel_manager.sub_channel
         xreq = kernel_manager.xreq_channel
+        rep = kernel_manager.rep_channel
         sub.message_received.connect(self._handle_sub)
         xreq.execute_reply.connect(self._handle_execute_reply)
         xreq.complete_reply.connect(self._handle_complete_reply)
         xreq.object_info_reply.connect(self._handle_object_info_reply)
+        rep.readline_requested.connect(self._handle_req)
         
         # Handle the case where the kernel manager started channels before
         # we connected.
@@ -292,10 +296,9 @@ class FrontendWidget(HistoryConsoleWidget):
         if position == self.textCursor().position():
             self._call_tip()
 
-    def _handle_req(self):
+    def _handle_req(self, req):
         def callback(line):
-            print repr(line)
-            self._show_prompt()
+            self.kernel_manager.rep_channel.readline(line)
         self._readline(callback=callback)
 
     def _handle_sub(self, omsg):
diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py
index de05231..f2e21ea 100644
--- a/IPython/frontend/qt/kernelmanager.py
+++ b/IPython/frontend/qt/kernelmanager.py
@@ -95,6 +95,12 @@ class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
 
 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
 
+    # Emitted when any message is received.
+    message_received = QtCore.pyqtSignal(object)
+
+    # Emitted when a readline request is received.
+    readline_requested = QtCore.pyqtSignal(object)
+
     #---------------------------------------------------------------------------
     # 'object' interface
     #---------------------------------------------------------------------------
@@ -105,6 +111,21 @@ class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
         QtCore.QObject.__init__(self)
         RepSocketChannel.__init__(self, *args, **kw)
 
+    #---------------------------------------------------------------------------
+    # 'RepSocketChannel' interface
+    #---------------------------------------------------------------------------
+
+    def call_handlers(self, msg):
+        """ Reimplemented to emit signals instead of making callbacks.
+        """
+        # Emit the generic signal.
+        self.message_received.emit(msg)
+        
+        # Emit signals for specialized message types.
+        msg_type = msg['msg_type']
+        if msg_type == 'readline_request':
+            self.readline_requested.emit(msg)
+    
 
 class QtKernelManager(KernelManager, QtCore.QObject):
     """ A KernelManager that provides signals and slots.
diff --git a/IPython/zmq/kernel.py b/IPython/zmq/kernel.py
index 1365e4c..17399ae 100755
--- a/IPython/zmq/kernel.py
+++ b/IPython/zmq/kernel.py
@@ -55,9 +55,10 @@ class InStream(object):
         if self.socket is None:
             raise ValueError(u'I/O operation on closed file')
         else:
-            content = { u'size' : unicode(size) }
-            msg = self.session.msg(u'readline', content=content) 
-            return self._request(msg)
+            content = dict(size=size)
+            msg = self.session.msg('readline_request', content=content) 
+            reply = self._request(msg)
+            return reply['content']['line']
 
     def readlines(self, size=-1):
         raise NotImplementedError
@@ -83,7 +84,7 @@ class InStream(object):
                     raise
             else:
                 break
-        return reply[u'content'][u'data']
+        return reply
 
 
 class OutStream(object):
diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py
index 1941123..166147d 100644
--- a/IPython/zmq/kernelmanager.py
+++ b/IPython/zmq/kernelmanager.py
@@ -199,7 +199,6 @@ class XReqSocketChannel(ZmqSocketChannel):
         Returns
         -------
         The msg_id of the message sent.
-
         """
         content = dict(text=text, line=line)
         msg = self.session.msg('complete_request', content)
@@ -338,24 +337,84 @@ class SubSocketChannel(ZmqSocketChannel):
 class RepSocketChannel(ZmqSocketChannel):
     """A reply channel to handle raw_input requests that the kernel makes."""
 
+    msg_queue = None
+
+    def __init__(self, context, session, address):
+        self.msg_queue = Queue()
+        super(RepSocketChannel, self).__init__(context, session, address)
+
     def run(self):
         """The thread's main activity.  Call start() instead."""
+        self.socket = self.context.socket(zmq.XREQ)
+        self.socket.setsockopt(zmq.IDENTITY, self.session.session)
+        self.socket.connect('tcp://%s:%i' % self.address)
         self.ioloop = ioloop.IOLoop()
+        self.iostate = POLLERR|POLLIN
+        self.ioloop.add_handler(self.socket, self._handle_events, 
+                                self.iostate)
         self.ioloop.start()
 
     def stop(self):
         self.ioloop.stop()
         super(RepSocketChannel, self).stop()
 
-    def on_raw_input(self):
-        pass
+    def call_handlers(self, msg):
+        """This method is called in the ioloop thread when a message arrives.
+
+        Subclasses should override this method to handle incoming messages.
+        It is important to remember that this method is called in the thread
+        so that some logic must be done to ensure that the application leve
+        handlers are called in the application thread.
+        """
+        raise NotImplementedError('call_handlers must be defined in a subclass.')
+
+    def readline(self, line):
+        """A send a line of raw input to the kernel.
+
+        Parameters
+        ----------
+        line : str
+            The line of the input.
+        """
+        content = dict(line=line)
+        msg = self.session.msg('readline_reply', content)
+        self._queue_reply(msg)
+
+    def _handle_events(self, socket, events):
+        if events & POLLERR:
+            self._handle_err()
+        if events & POLLOUT:
+            self._handle_send()
+        if events & POLLIN:
+            self._handle_recv()
+
+    def _handle_recv(self):
+        msg = self.socket.recv_json()
+        self.call_handlers(msg)
+
+    def _handle_send(self):
+        try:
+            msg = self.msg_queue.get(False)
+        except Empty:
+            pass
+        else:
+            self.socket.send_json(msg)
+        if self.msg_queue.empty():
+            self.drop_io_state(POLLOUT)
+
+    def _handle_err(self):
+        # We don't want to let this go silently, so eventually we should log.
+        raise zmq.ZMQError()
+
+    def _queue_reply(self, msg):
+        self.msg_queue.put(msg)
+        self.add_io_state(POLLOUT)
 
 
 #-----------------------------------------------------------------------------
 # Main kernel manager class
 #-----------------------------------------------------------------------------
 
-
 class KernelManager(HasTraits):
     """ Manages a kernel for a frontend.