diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index fa78d60..848ede6 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -28,14 +28,37 @@ class ZmqSocketChannel(Thread): """ The base class for the channels that use ZMQ sockets. """ - def __init__(self, context, session, addr=None): + def __init__(self, context, session, address=None): + super(ZmqSocketChannel, self).__init__() + self.daemon = True + self.context = context self.session = session - self.addr = addr + self.address = address self.socket = None - super(ZmqSocketChannel, self).__init__() - self.daemon = True + def stop(self): + """ Stop the thread's activity. Returns when the thread terminates. + """ + raise NotImplementedError + + def get_address(self): + """ Get the channel's address. + """ + return self._address + + def set_adresss(self, address): + """ Set the channel's address. Should be a tuple of form: + (ip address [str], port [int]) + or 'None' to indicate that no address has been specified. + """ + # FIXME: Validate address. + if self.is_alive(): + raise RuntimeError("Cannot set address on a running channel!") + else: + self._address = address + + address = property(get_address, set_adresss) class SubSocketChannel(ZmqSocketChannel): @@ -43,20 +66,24 @@ class SubSocketChannel(ZmqSocketChannel): handlers = None _overriden_call_handler = None - def __init__(self, context, session, addr=None): + def __init__(self, context, session, address=None): self.handlers = {} - super(SubSocketChannel, self).__init__(context, session, addr) + super(SubSocketChannel, self).__init__(context, session, address) def run(self): self.socket = self.context.socket(zmq.SUB) self.socket.setsockopt(zmq.SUBSCRIBE,'') self.socket.setsockopt(zmq.IDENTITY, self.session.session) - self.socket.connect('tcp://%s:%i' % self.addr) + self.socket.connect('tcp://%s:%i' % self.address) self.ioloop = ioloop.IOLoop() self.ioloop.add_handler(self.socket, self._handle_events, POLLIN|POLLERR) self.ioloop.start() + def stop(self): + self.ioloop.stop() + self.join() + def _handle_events(self, socket, events): # Turn on and off POLLOUT depending on if we have made a request if events & POLLERR: @@ -136,21 +163,25 @@ class XReqSocketChannel(ZmqSocketChannel): handlers = None _overriden_call_handler = None - def __init__(self, context, session, addr=None): + def __init__(self, context, session, address=None): self.handlers = {} self.handler_queue = Queue() self.command_queue = Queue() - super(XReqSocketChannel, self).__init__(context, session, addr) + super(XReqSocketChannel, self).__init__(context, session, address) def run(self): self.socket = self.context.socket(zmq.XREQ) self.socket.setsockopt(zmq.IDENTITY, self.session.session) - self.socket.connect('tcp://%s:%i' % self.addr) + self.socket.connect('tcp://%s:%i' % self.address) self.ioloop = ioloop.IOLoop() self.ioloop.add_handler(self.socket, self._handle_events, POLLIN|POLLOUT|POLLERR) self.ioloop.start() + def stop(self): + self.ioloop.stop() + self.join() + def _handle_events(self, socket, events): # Turn on and off POLLOUT depending on if we have made a request if events & POLLERR: @@ -246,7 +277,10 @@ class XReqSocketChannel(ZmqSocketChannel): class RepSocketChannel(ZmqSocketChannel): - def on_raw_input(): + def stop(self): + pass + + def on_raw_input(self): pass @@ -268,22 +302,10 @@ class KernelManager(HasTraits): # The Session to use for communication with the kernel. session = Instance(Session, ()) - # The channels objects used for communication with the kernel. - # FIXME: Add '_traitname_default' instantiation method to Traitlets. - #sub_channel = Instance(SubSocketChannel) - #xreq_channel = Instance(XReqSocketChannel) - #rep_channel = Instance(RepSocketChannel) - # The classes to use for the various channels. sub_channel_class = Type(SubSocketChannel) xreq_channel_class = Type(XReqSocketChannel) rep_channel_class = Type(RepSocketChannel) - - # The addresses to use for the various channels. Should be tuples of form - # (ip_address, port). - #sub_address = DelegatesTo('sub_channel') - #xreq_address = DelegatesTo('xreq_channel') - #rep_address = DelegatesTo('rep_channel') # Protected traits. _sub_channel = Any @@ -302,9 +324,16 @@ class KernelManager(HasTraits): """Start a localhost kernel. If ports have been specified, use them. Otherwise, choose an open port at random. """ + self.sub_channel.start() + self.xreq_channel.start() + self.rep_channel.start() def kill_kernel(self): - """Kill the running kernel""" + """Kill the running kernel. + """ + self.sub_channel.stop() + self.xreq_channel.stop() + self.rep_channel.stop() def is_alive(self): """Is the kernel alive?""" @@ -313,6 +342,10 @@ class KernelManager(HasTraits): def signal_kernel(self, signum): """Send signum to the kernel.""" + #-------------------------------------------------------------------------- + # Channels used for communication with the kernel: + #-------------------------------------------------------------------------- + @property def sub_channel(self): """Get the SUB socket channel object.""" @@ -337,23 +370,34 @@ class KernelManager(HasTraits): self.session) return self._rep_channel + #-------------------------------------------------------------------------- + # Channel addresses: + #-------------------------------------------------------------------------- + def get_sub_address(self): - return self.sub_channel.addr - def set_sub_address(self, addr): - self.sub_channel.addr = addr + return self.sub_channel.address + + def set_sub_address(self, address): + self.sub_channel.address = address + sub_address = property(get_sub_address, set_sub_address, doc="The address used by SUB socket channel.") def get_xreq_address(self): - return self.xreq_channel.addr - def set_xreq_address(self, addr): - self.xreq_channel.addr = addr + return self.xreq_channel.address + + def set_xreq_address(self, address): + self.xreq_channel.address = address + xreq_address = property(get_xreq_address, set_xreq_address, doc="The address used by XREQ socket channel.") def get_rep_address(self): - return self.rep_channel.addr - def set_rep_address(self, addr): - self.rep_channel.addr = addr + return self.rep_channel.address + + def set_rep_address(self, address): + self.rep_channel.address = address + rep_address = property(get_rep_address, set_rep_address, doc="The address used by REP socket channel.") +