From c46f948dfce115a553578dae00178c4be6c35210 2010-08-04 14:25:24
From: epatters <epatters@enthought.com>
Date: 2010-08-04 14:25:24
Subject: [PATCH] Merge branch 'kernelmanager' of git://github.com/ellisonbg/ipython into qtfrontend. Fixed breakage and conflicts from merge.

Conflicts:
	IPython/frontend/qt/console/ipython_widget.py

---

diff --git a/IPython/frontend/qt/console/frontend_widget.py b/IPython/frontend/qt/console/frontend_widget.py
index f5ab175..afaa3e1 100644
--- a/IPython/frontend/qt/console/frontend_widget.py
+++ b/IPython/frontend/qt/console/frontend_widget.py
@@ -161,10 +161,10 @@ class FrontendWidget(HistoryConsoleWidget):
         """
         # Disconnect the old kernel manager, if necessary.
         if self._kernel_manager is not None:
-            self._kernel_manager.started_listening.disconnect(
-                self._started_listening)
-            self._kernel_manager.stopped_listening.disconnect(
-                self._stopped_listening)
+            self._kernel_manager.started_channels.disconnect(
+                self._started_channels)
+            self._kernel_manager.stopped_channels.disconnect(
+                self._stopped_channels)
 
             # Disconnect the old kernel manager's channels.
             sub = self._kernel_manager.sub_channel
@@ -174,9 +174,9 @@ class FrontendWidget(HistoryConsoleWidget):
             xreq.complete_reply.disconnect(self._handle_complete_reply)
             xreq.object_info_reply.disconnect(self._handle_object_info_reply)
 
-            # Handle the case where the old kernel manager is still listening.
-            if self._kernel_manager.is_listening:
-                self._stopped_listening()
+            # Handle the case where the old kernel manager is still channels.
+            if self._kernel_manager.channels_running:
+                self._stopped_channels()
 
         # Set the new kernel manager.
         self._kernel_manager = kernel_manager
@@ -184,8 +184,8 @@ class FrontendWidget(HistoryConsoleWidget):
             return
 
         # Connect the new kernel manager.
-        kernel_manager.started_listening.connect(self._started_listening)
-        kernel_manager.stopped_listening.connect(self._stopped_listening)
+        kernel_manager.started_channels.connect(self._started_channels)
+        kernel_manager.stopped_channels.connect(self._stopped_channels)
 
         # Connect the new kernel manager's channels.
         sub = kernel_manager.sub_channel
@@ -195,10 +195,10 @@ class FrontendWidget(HistoryConsoleWidget):
         xreq.complete_reply.connect(self._handle_complete_reply)
         xreq.object_info_reply.connect(self._handle_object_info_reply)
         
-        # Handle the case where the kernel manager started listening before
+        # Handle the case where the kernel manager started channels before
         # we connected.
-        if kernel_manager.is_listening:
-            self._started_listening()
+        if kernel_manager.channels_running:
+            self._started_channels()
 
     kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
 
@@ -323,8 +323,8 @@ class FrontendWidget(HistoryConsoleWidget):
             if doc:
                 self._call_tip_widget.show_docstring(doc)
 
-    def _started_listening(self):
+    def _started_channels(self):
         self.clear()
 
-    def _stopped_listening(self):
+    def _stopped_channels(self):
         pass
diff --git a/IPython/frontend/qt/console/ipython_widget.py b/IPython/frontend/qt/console/ipython_widget.py
index b40b20f..b26b639 100644
--- a/IPython/frontend/qt/console/ipython_widget.py
+++ b/IPython/frontend/qt/console/ipython_widget.py
@@ -82,7 +82,7 @@ if __name__ == '__main__':
     # Create a KernelManager.
     kernel_manager = QtKernelManager()
     kernel_manager.start_kernel()
-    kernel_manager.start_listening()
+    kernel_manager.start_channels()
 
     # Launch the application.
     app = QtGui.QApplication([])
diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py
index f546479..1ae72bf 100644
--- a/IPython/frontend/qt/kernelmanager.py
+++ b/IPython/frontend/qt/kernelmanager.py
@@ -93,12 +93,6 @@ class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
         if signal:
             signal.emit(msg)
 
-    def _queue_request(self, msg, callback):
-        """ Reimplemented to skip callback handling.
-        """
-        self.command_queue.put(msg)
-        self.add_io_state(zmq.POLLOUT)
-
 
 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
 
@@ -120,10 +114,10 @@ class QtKernelManager(KernelManager, QtCore.QObject):
     __metaclass__ = MetaQObjectHasTraits
 
     # Emitted when the kernel manager has started listening.
-    started_listening = QtCore.pyqtSignal()
+    started_channels = QtCore.pyqtSignal()
 
     # Emitted when the kernel manager has stopped listening.
-    stopped_listening = QtCore.pyqtSignal()
+    stopped_channels = QtCore.pyqtSignal()
 
     # Use Qt-specific channel classes that emit signals.
     sub_channel_class = QtSubSocketChannel
@@ -131,17 +125,27 @@ class QtKernelManager(KernelManager, QtCore.QObject):
     rep_channel_class = QtRepSocketChannel
 
     #---------------------------------------------------------------------------
+    # 'object' interface
+    #---------------------------------------------------------------------------
+    
+    def __init__(self, *args, **kw):
+        """ Reimplemented to ensure that QtCore.QObject is initialized first.
+        """
+        QtCore.QObject.__init__(self)
+        KernelManager.__init__(self, *args, **kw)
+
+    #---------------------------------------------------------------------------
     # 'KernelManager' interface
     #---------------------------------------------------------------------------
     
-    def start_listening(self):
+    def start_channels(self):
         """ Reimplemented to emit signal.
         """
-        super(QtKernelManager, self).start_listening()
-        self.started_listening.emit()
+        super(QtKernelManager, self).start_channels()
+        self.started_channels.emit()
 
-    def stop_listening(self):
+    def stop_channels(self):
         """ Reimplemented to emit signal.
         """ 
-        super(QtKernelManager, self).stop_listening()
-        self.stopped_listening.emit()
+        super(QtKernelManager, self).stop_channels()
+        self.stopped_channels.emit()
diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py
index 0829c35..7b879e0 100644
--- a/IPython/zmq/kernelmanager.py
+++ b/IPython/zmq/kernelmanager.py
@@ -1,15 +1,27 @@
-"""Kernel frontend classes.
+"""Classes to manage the interaction with a running kernel.
 
-TODO: Create logger to handle debugging and console messages.
+Todo
+====
 
+* Create logger to handle debugging and console messages.
 """
 
+#-----------------------------------------------------------------------------
+#  Copyright (C) 2008-2010  The IPython Development Team
+#
+#  Distributed under the terms of the BSD License.  The full license is in
+#  the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
 # Standard library imports.
 from Queue import Queue, Empty
 from subprocess import Popen
 from threading import Thread
 import time
-import traceback
 
 # System library imports.
 import zmq
@@ -17,70 +29,80 @@ from zmq import POLLIN, POLLOUT, POLLERR
 from zmq.eventloop import ioloop
 
 # Local imports.
-from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
-    Type
+from IPython.utils.traitlets import HasTraits, Any, Instance, Type
 from kernel import launch_kernel
 from session import Session
 
-# Constants.
-LOCALHOST = '127.0.0.1'
+#-----------------------------------------------------------------------------
+# Constants and exceptions
+#-----------------------------------------------------------------------------
 
+LOCALHOST = '127.0.0.1'
 
-class MissingHandlerError(Exception):
+class InvalidPortNumber(Exception):
     pass
 
+#-----------------------------------------------------------------------------
+# ZMQ Socket Channel classes
+#-----------------------------------------------------------------------------
 
 class ZmqSocketChannel(Thread):
-    """ The base class for the channels that use ZMQ sockets.
+    """The base class for the channels that use ZMQ sockets.
     """
-
     context = None
     session = None
     socket = None
     ioloop = None
     iostate = None
+    _address = None
 
-    def __init__(self, context, session, address=None):
+    def __init__(self, context, session, address):
+        """Create a channel
+
+        Parameters
+        ----------
+        context : zmq.Context
+            The ZMQ context to use.
+        session : session.Session
+            The session to use.
+        address : tuple
+            Standard (ip, port) tuple that the kernel is listening on.
+        """
         super(ZmqSocketChannel, self).__init__()
         self.daemon = True
 
         self.context = context
         self.session = session
-        self.address = address
+        if address[1] == 0:
+            raise InvalidPortNumber('The port number for a channel cannot be 0.')
+        self._address = address
 
     def stop(self):
-        """Stop the thread's activity. Returns when the thread terminates.
+        """Stop the channel's activity.
 
-        The thread will raise :class:`RuntimeError` if :method:`self.start`
-        is called again.
+        This calls :method:`Thread.join` and returns when the thread
+        terminates. :class:`RuntimeError` will be raised if 
+        :method:`self.start` is called again.
         """
         self.join()
 
-
-    def get_address(self):
-        """ Get the channel's address. By the default, a channel is on 
-            localhost with no port specified (a negative port number).
+    @property
+    def address(self):
+        """Get the channel's address as an (ip, port) tuple.
+        
+        By the default, the address is (localhost, 0), where 0 means a random
+        port.
         """
         return self._address
 
-    def set_adresss(self, address):
-        """ Set the channel's address. Should be a tuple of form:
-                (ip address [str], port [int]).
-            or None, in which case the address is reset to its default value.
-        """
-        # FIXME: Validate address.
-        if self.is_alive():  # This is Thread.is_alive
-            raise RuntimeError("Cannot set address on a running channel!")
-        else:
-            if address is None:
-                address = (LOCALHOST, 0)
-            self._address = address
-
-    address = property(get_address, set_adresss)
-
     def add_io_state(self, state):
         """Add IO state to the eventloop.
 
+        Parameters
+        ----------
+        state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
+            The IO state flag to set.
+
         This is thread safe as it uses the thread safe IOLoop.add_callback.
         """
         def add_io_state_callback():
@@ -92,6 +114,11 @@ class ZmqSocketChannel(Thread):
     def drop_io_state(self, state):
         """Drop IO state from the eventloop.
 
+        Parameters
+        ----------
+        state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
+            The IO state flag to set.
+
         This is thread safe as it uses the thread safe IOLoop.add_callback.
         """
         def drop_io_state_callback():
@@ -101,48 +128,30 @@ class ZmqSocketChannel(Thread):
         self.ioloop.add_callback(drop_io_state_callback)
 
 
-class SubSocketChannel(ZmqSocketChannel):
+class XReqSocketChannel(ZmqSocketChannel):
+    """The XREQ channel for issues request/replies to the kernel.
+    """
 
-    def __init__(self, context, session, address=None):
-        super(SubSocketChannel, self).__init__(context, session, address)
+    command_queue = None
+
+    def __init__(self, context, session, address):
+        self.command_queue = Queue()
+        super(XReqSocketChannel, self).__init__(context, session, address)
 
     def run(self):
-        self.socket = self.context.socket(zmq.SUB)
-        self.socket.setsockopt(zmq.SUBSCRIBE,'')
+        """The thread's main activity.  Call start() instead."""
+        self.socket = self.context.socket(zmq.XREQ)
         self.socket.setsockopt(zmq.IDENTITY, self.session.session)
         self.socket.connect('tcp://%s:%i' % self.address)
         self.ioloop = ioloop.IOLoop()
-        self.iostate = POLLIN|POLLERR
+        self.iostate = POLLERR|POLLIN
         self.ioloop.add_handler(self.socket, self._handle_events, 
                                 self.iostate)
         self.ioloop.start()
 
     def stop(self):
         self.ioloop.stop()
-        super(SubSocketChannel, self).stop()
-
-    def _handle_events(self, socket, events):
-        # Turn on and off POLLOUT depending on if we have made a request
-        if events & POLLERR:
-            self._handle_err()
-        if events & POLLIN:
-            self._handle_recv()
-
-    def _handle_err(self):
-        # We don't want to let this go silently, so eventually we should log.
-        raise zmq.ZMQError()
-
-    def _handle_recv(self):
-        # Get all of the messages we can
-        while True:
-            try:
-                msg = self.socket.recv_json(zmq.NOBLOCK)
-            except zmq.ZMQError:
-                # Check the errno?
-                # Will this tigger POLLERR?
-                break
-            else:
-                self.call_handlers(msg)
+        super(XReqSocketChannel, self).stop()
 
     def call_handlers(self, msg):
         """This method is called in the ioloop thread when a message arrives.
@@ -154,59 +163,65 @@ class SubSocketChannel(ZmqSocketChannel):
         """
         raise NotImplementedError('call_handlers must be defined in a subclass.')
 
-    def flush(self, timeout=1.0):
-        """Immediately processes all pending messages on the SUB channel.
-
-        This method is thread safe.
+    def execute(self, code):
+        """Execute code in the kernel.
 
         Parameters
         ----------
-        timeout : float, optional
-            The maximum amount of time to spend flushing, in seconds. The
-            default is one second.
-        """
-        # We do the IOLoop callback process twice to ensure that the IOLoop
-        # gets to perform at least one full poll.
-        stop_time = time.time() + timeout
-        for i in xrange(2):
-            self._flushed = False
-            self.ioloop.add_callback(self._flush)
-            while not self._flushed and time.time() < stop_time:
-                time.sleep(0.01)
-        
-    def _flush(self):
-        """Called in this thread by the IOLoop to indicate that all events have
-        been processed.
+        code : str
+            A string of Python code.
+
+        Returns
+        -------
+        The msg_id of the message sent.
         """
-        self._flushed = True
+        # Create class for content/msg creation. Related to, but possibly
+        # not in Session.
+        content = dict(code=code)
+        msg = self.session.msg('execute_request', content)
+        self._queue_request(msg)
+        return msg['header']['msg_id']
 
+    def complete(self, text, line, block=None):
+        """Tab complete text, line, block in the kernel's namespace.
 
-class XReqSocketChannel(ZmqSocketChannel):
+        Parameters
+        ----------
+        text : str
+            The text to complete.
+        line : str
+            The full line of text that is the surrounding context for the 
+            text to complete.
+        block : str
+            The full block of code in which the completion is being requested.
+
+        Returns
+        -------
+        The msg_id of the message sent.
 
-    handler_queue = None
-    command_queue = None
-    handlers = None
-    _overriden_call_handler = None
+        """
+        content = dict(text=text, line=line)
+        msg = self.session.msg('complete_request', content)
+        self._queue_request(msg)
+        return msg['header']['msg_id']
 
-    def __init__(self, context, session, address=None):
-        self.handlers = {}
-        self.handler_queue = Queue()
-        self.command_queue = Queue()
-        super(XReqSocketChannel, self).__init__(context, session, address)
+    def object_info(self, oname):
+        """Get metadata information about an object.
 
-    def run(self):
-        self.socket = self.context.socket(zmq.XREQ)
-        self.socket.setsockopt(zmq.IDENTITY, self.session.session)
-        self.socket.connect('tcp://%s:%i' % self.address)
-        self.ioloop = ioloop.IOLoop()
-        self.iostate = POLLERR|POLLIN
-        self.ioloop.add_handler(self.socket, self._handle_events, 
-                                self.iostate)
-        self.ioloop.start()
-
-    def stop(self):
-        self.ioloop.stop()
-        super(XReqSocketChannel, self).stop()
+        Parameters
+        ----------
+        oname : str
+            A string specifying the object name.
+        
+        Returns
+        -------
+        The msg_id of the message sent.
+        """
+        print oname
+        content = dict(oname=oname)
+        msg = self.session.msg('object_info_request', content)
+        self._queue_request(msg)
+        return msg['header']['msg_id']
 
     def _handle_events(self, socket, events):
         if events & POLLERR:
@@ -234,82 +249,113 @@ class XReqSocketChannel(ZmqSocketChannel):
         # We don't want to let this go silently, so eventually we should log.
         raise zmq.ZMQError()
 
-    def _queue_request(self, msg, callback):
-        handler = self._find_handler(msg['msg_type'], callback)
-        self.handler_queue.put(handler)
+    def _queue_request(self, msg):
         self.command_queue.put(msg)
         self.add_io_state(POLLOUT)
 
-    def execute(self, code, callback=None):
-        # Create class for content/msg creation. Related to, but possibly
-        # not in Session.
-        content = dict(code=code)
-        msg = self.session.msg('execute_request', content)
-        self._queue_request(msg, callback)
-        return msg['header']['msg_id']
 
-    def complete(self, text, line, block=None, callback=None):
-        content = dict(text=text, line=line)
-        msg = self.session.msg('complete_request', content)
-        self._queue_request(msg, callback)
-        return msg['header']['msg_id']
+class SubSocketChannel(ZmqSocketChannel):
+    """The SUB channel which listens for messages that the kernel publishes.
+    """
 
-    def object_info(self, oname, callback=None):
-        content = dict(oname=oname)
-        msg = self.session.msg('object_info_request', content)
-        self._queue_request(msg, callback)
-        return msg['header']['msg_id']
+    def __init__(self, context, session, address):
+        super(SubSocketChannel, self).__init__(context, session, address)
 
-    def _find_handler(self, name, callback):
-        if callback is not None:
-            return callback
-        handler = self.handlers.get(name)
-        if handler is None:
-            raise MissingHandlerError(
-                'No handler defined for method: %s' % name)
-        return handler
-
-    def override_call_handler(self, func):
-        """Permanently override the call_handler.
-    
-        The function func will be called as::
+    def run(self):
+        """The thread's main activity.  Call start() instead."""
+        self.socket = self.context.socket(zmq.SUB)
+        self.socket.setsockopt(zmq.SUBSCRIBE,'')
+        self.socket.setsockopt(zmq.IDENTITY, self.session.session)
+        self.socket.connect('tcp://%s:%i' % self.address)
+        self.ioloop = ioloop.IOLoop()
+        self.iostate = POLLIN|POLLERR
+        self.ioloop.add_handler(self.socket, self._handle_events, 
+                                self.iostate)
+        self.ioloop.start()
 
-            func(handler, msg)
+    def stop(self):
+        self.ioloop.stop()
+        super(SubSocketChannel, self).stop()
 
-        And must call::
-        
-            handler(msg)
+    def call_handlers(self, msg):
+        """This method is called in the ioloop thread when a message arrives.
 
-        in the main thread.
+        Subclasses should override this method to handle incoming messages.
+        It is important to remember that this method is called in the thread
+        so that some logic must be done to ensure that the application leve
+        handlers are called in the application thread.
         """
-        assert callable(func), "not a callable: %r" % func
-        self._overriden_call_handler = func
+        raise NotImplementedError('call_handlers must be defined in a subclass.')
 
-    def call_handlers(self, msg):
-        try:
-            handler = self.handler_queue.get(False)
-        except Empty:
-            print "Message received with no handler!!!"
-            print msg
-        else:
-            self.call_handler(handler, msg)
-
-    def call_handler(self, handler, msg):
-        if self._overriden_call_handler is not None:
-            self._overriden_call_handler(handler, msg)
-        elif hasattr(self, '_call_handler'):
-           call_handler = getattr(self, '_call_handler')
-           call_handler(handler, msg)
-        else:
-            raise RuntimeError('no handler!')
+    def flush(self, timeout=1.0):
+        """Immediately processes all pending messages on the SUB channel.
+
+        This method is thread safe.
+
+        Parameters
+        ----------
+        timeout : float, optional
+            The maximum amount of time to spend flushing, in seconds. The
+            default is one second.
+        """
+        # We do the IOLoop callback process twice to ensure that the IOLoop
+        # gets to perform at least one full poll.
+        stop_time = time.time() + timeout
+        for i in xrange(2):
+            self._flushed = False
+            self.ioloop.add_callback(self._flush)
+            while not self._flushed and time.time() < stop_time:
+                time.sleep(0.01)
+
+    def _handle_events(self, socket, events):
+        # Turn on and off POLLOUT depending on if we have made a request
+        if events & POLLERR:
+            self._handle_err()
+        if events & POLLIN:
+            self._handle_recv()
+
+    def _handle_err(self):
+        # We don't want to let this go silently, so eventually we should log.
+        raise zmq.ZMQError()
+
+    def _handle_recv(self):
+        # Get all of the messages we can
+        while True:
+            try:
+                msg = self.socket.recv_json(zmq.NOBLOCK)
+            except zmq.ZMQError:
+                # Check the errno?
+                # Will this tigger POLLERR?
+                break
+            else:
+                self.call_handlers(msg)
+
+    def _flush(self):
+        """Callback for :method:`self.flush`."""
+        self._flushed = True
 
 
 class RepSocketChannel(ZmqSocketChannel):
+    """A reply channel to handle raw_input requests that the kernel makes."""
+
+    def run(self):
+        """The thread's main activity.  Call start() instead."""
+        self.ioloop = ioloop.IOLoop()
+        self.ioloop.start()
+
+    def stop(self):
+        self.ioloop.stop()
+        super(SubSocketChannel, self).stop()
 
     def on_raw_input(self):
         pass
 
 
+#-----------------------------------------------------------------------------
+# Main kernel manager class
+#-----------------------------------------------------------------------------
+
+
 class KernelManager(HasTraits):
     """ Manages a kernel for a frontend.
 
@@ -321,59 +367,66 @@ class KernelManager(HasTraits):
     The REP channel is for the kernel to request stdin (raw_input) from the
     frontend.
     """
-
-    # Whether the kernel manager is currently listening on its channels.
-    is_listening = Bool(False)
-
     # The PyZMQ Context to use for communication with the kernel.
-    context = Instance(zmq.Context, ())
+    context = Instance(zmq.Context)
 
     # The Session to use for communication with the kernel.
-    session = Instance(Session, ())
+    session = Instance(Session)
 
     # The classes to use for the various channels.
-    sub_channel_class = Type(SubSocketChannel)
     xreq_channel_class = Type(XReqSocketChannel)
+    sub_channel_class = Type(SubSocketChannel)
     rep_channel_class = Type(RepSocketChannel)
     
     # Protected traits.
     _kernel = Instance(Popen)
-    _sub_channel = Any
+    _xreq_address = Any
+    _sub_address = Any
+    _rep_address = Any
     _xreq_channel = Any
+    _sub_channel = Any
     _rep_channel = Any
 
+    def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
+                 context=None, session=None):
+        self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
+        self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
+        self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
+        self.context = zmq.Context() if context is None else context
+        self.session = Session() if session is None else session
+
     #--------------------------------------------------------------------------
     # Channel management methods:
     #--------------------------------------------------------------------------
 
-    def start_listening(self):
-        """Starts listening on the specified ports. If already listening, raises
-        a RuntimeError.
+    def start_channels(self):
+        """Starts the channels for this kernel.
+
+        This will create the channels if they do not exist and then start
+        them. If port numbers of 0 are being used (random ports) then you
+        must first call :method:`start_kernel`. If the channels have been
+        stopped and you call this, :class:`RuntimeError` will be raised.
         """
-        if self.is_listening:
-            raise RuntimeError("Cannot start listening. Already listening!")
-        else:
-            self.is_listening = True
-            self.sub_channel.start()
-            self.xreq_channel.start()
-            self.rep_channel.start()
+        self.xreq_channel.start()
+        self.sub_channel.start()
+        self.rep_channel.start()
 
-    @property
-    def is_alive(self):
-        """ Returns whether the kernel is alive. """
-        if self.is_listening:
-            # TODO: check if alive.
-            return True
-        else:
-            return False
+    def stop_channels(self):
+        """Stops the channels for this kernel.
+
+        This stops the channels by joining their threads. If the channels
+        were not started, :class:`RuntimeError` will be raised.
+        """
+        self.xreq_channel.stop()
+        self.sub_channel.stop()
+        self.rep_channel.stop()
 
-    def stop_listening(self):
-        """Stops listening. If not listening, does nothing. """
-        if self.is_listening:
-            self.is_listening = False
-            self.sub_channel.stop()
-            self.xreq_channel.stop()
-            self.rep_channel.stop()
+    @property
+    def channels_running(self):
+        """Are all of the channels created and running?"""
+        return self.xreq_channel.is_alive() \
+            and self.sub_channel.is_alive() \
+            and self.rep_channel.is_alive()
 
     #--------------------------------------------------------------------------
     # Kernel process management methods:
@@ -382,9 +435,8 @@ class KernelManager(HasTraits):
     def start_kernel(self):
         """Starts a kernel process and configures the manager to use it.
 
-        If ports have been specified via the address attributes, they are used.
-        Otherwise, open ports are chosen by the OS and the channel port
-        attributes are configured as appropriate.
+        If random ports (port=0) are being used, this method must be called
+        before the channels are created.
         """
         xreq, sub = self.xreq_address, self.sub_address
         if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
@@ -393,24 +445,13 @@ class KernelManager(HasTraits):
                                "configured properly.")
 
         kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
-        self.set_kernel(kernel)
-        self.xreq_address = (LOCALHOST, xrep)
-        self.sub_address = (LOCALHOST, pub)
-
-    def set_kernel(self, kernel):
-        """Sets the kernel manager's kernel to an existing kernel process.
-
-        It is *not* necessary to a set a kernel to communicate with it via the
-        channels, and those objects must be configured separately. It
-        *is* necessary to set a kernel if you want to use the manager (or
-        frontends that use the manager) to signal and/or kill the kernel.
-
-        Parameters:
-        -----------
-        kernel : Popen
-            An existing kernel process.
-        """
         self._kernel = kernel
+        self._xreq_address = (LOCALHOST, xrep)
+        self._sub_address = (LOCALHOST, pub)
+        # The rep channel is not fully working yet, but its base class makes
+        # sure the port is not 0. We set to -1 for now until the rep channel
+        # is fully working.
+        self._rep_address = (LOCALHOST, -1)
 
     @property
     def has_kernel(self):
@@ -423,7 +464,7 @@ class KernelManager(HasTraits):
 
     def kill_kernel(self):
         """ Kill the running kernel. """
-        if self._kernel:
+        if self._kernel is not None:
             self._kernel.kill()
             self._kernel = None
         else:
@@ -431,67 +472,65 @@ class KernelManager(HasTraits):
 
     def signal_kernel(self, signum):
         """ Sends a signal to the kernel. """
-        if self._kernel:
+        if self._kernel is not None:
             self._kernel.send_signal(signum)
         else:
             raise RuntimeError("Cannot signal kernel. No kernel is running!")
 
+    @property
+    def is_alive(self):
+        """Is the kernel process still running?"""
+        if self._kernel is not None:
+            if self._kernel.poll() is None:
+                return True
+            else:
+                return False
+        else:
+            # We didn't start the kernel with this KernelManager so we don't
+            # know if it is running. We should use a heartbeat for this case.
+            return True
+
     #--------------------------------------------------------------------------
     # Channels used for communication with the kernel:
     #--------------------------------------------------------------------------
 
     @property
-    def sub_channel(self):
-        """Get the SUB socket channel object."""
-        if self._sub_channel is None:
-            self._sub_channel = self.sub_channel_class(self.context,
-                                                       self.session)
-        return self._sub_channel
-
-    @property
     def xreq_channel(self):
         """Get the REQ socket channel object to make requests of the kernel."""
         if self._xreq_channel is None:
             self._xreq_channel = self.xreq_channel_class(self.context, 
-                                                         self.session)
+                                                         self.session,
+                                                         self.xreq_address)
         return self._xreq_channel
 
     @property
+    def sub_channel(self):
+        """Get the SUB socket channel object."""
+        if self._sub_channel is None:
+            self._sub_channel = self.sub_channel_class(self.context,
+                                                       self.session,
+                                                       self.sub_address)
+        return self._sub_channel
+
+    @property
     def rep_channel(self):
         """Get the REP socket channel object to handle stdin (raw_input)."""
         if self._rep_channel is None:
             self._rep_channel = self.rep_channel_class(self.context, 
-                                                       self.session)
+                                                       self.session,
+                                                       self.rep_address)
         return self._rep_channel
 
-    #--------------------------------------------------------------------------
-    # Delegates for the Channel address attributes:
-    #--------------------------------------------------------------------------
-
-    def get_sub_address(self):
-        return self.sub_channel.address
-
-    def set_sub_address(self, address):
-        self.sub_channel.address = address
-
-    sub_address = property(get_sub_address, set_sub_address,
-                           doc="The address used by SUB socket channel.")
-
-    def get_xreq_address(self):
-        return self.xreq_channel.address
-
-    def set_xreq_address(self, address):
-        self.xreq_channel.address = address
+    @property
+    def xreq_address(self):
+        return self._xreq_address
 
-    xreq_address = property(get_xreq_address, set_xreq_address,
-                            doc="The address used by XREQ socket channel.")
-    
-    def get_rep_address(self):
-        return self.rep_channel.address
+    @property
+    def sub_address(self):
+        return self._sub_address
 
-    def set_rep_address(self, address):
-        self.rep_channel.address = address
+    @property
+    def rep_address(self):
+        return self._rep_address
 
-    rep_address = property(get_rep_address, set_rep_address,
-                           doc="The address used by REP socket channel.")