From 34c425106770628b492294766dd66b7d2c3b4f3d 2014-12-05 01:49:40 From: Thomas Kluyver Date: 2014-12-05 01:49:40 Subject: [PATCH] Collapse ZMQSocketChannel into HBChannel class --- diff --git a/IPython/kernel/channels.py b/IPython/kernel/channels.py index d4d98e2..42ffe09 100644 --- a/IPython/kernel/channels.py +++ b/IPython/kernel/channels.py @@ -62,124 +62,6 @@ def validate_string_dict(dct): raise ValueError('value %r in dict must be a string' % v) -#----------------------------------------------------------------------------- -# ZMQ Socket Channel classes -#----------------------------------------------------------------------------- - -class ZMQSocketChannel(Thread): - """The base class for the channels that use ZMQ sockets.""" - context = None - session = None - socket = None - ioloop = None - stream = None - _address = None - _exiting = False - proxy_methods = [] - - def __init__(self, context, session, address): - """Create a channel. - - Parameters - ---------- - context : :class:`zmq.Context` - The ZMQ context to use. - session : :class:`session.Session` - The session to use. - address : zmq url - Standard (ip, port) tuple that the kernel is listening on. - """ - super(ZMQSocketChannel, self).__init__() - self.daemon = True - - self.context = context - self.session = session - if isinstance(address, tuple): - if address[1] == 0: - message = 'The port number for a channel cannot be 0.' - raise InvalidPortNumber(message) - address = "tcp://%s:%i" % address - self._address = address - atexit.register(self._notice_exit) - - def _notice_exit(self): - self._exiting = True - - def _run_loop(self): - """Run my loop, ignoring EINTR events in the poller""" - while True: - try: - self.ioloop.start() - except ZMQError as e: - if e.errno == errno.EINTR: - continue - else: - raise - except Exception: - if self._exiting: - break - else: - raise - else: - break - - def stop(self): - """Stop the channel's event loop and join its thread. - - This calls :meth:`~threading.Thread.join` and returns when the thread - terminates. :class:`RuntimeError` will be raised if - :meth:`~threading.Thread.start` is called again. - """ - if self.ioloop is not None: - self.ioloop.stop() - self.join() - self.close() - - def close(self): - if self.ioloop is not None: - try: - self.ioloop.close(all_fds=True) - except Exception: - pass - if self.socket is not None: - try: - self.socket.close(linger=0) - except Exception: - pass - self.socket = None - - @property - def address(self): - """Get the channel's address as a zmq url string. - - These URLS have the form: 'tcp://127.0.0.1:5555'. - """ - return self._address - - def _queue_send(self, msg): - """Queue a message to be sent from the IOLoop's thread. - - Parameters - ---------- - msg : message to send - - This is threadsafe, as it uses IOLoop.add_callback to give the loop's - thread control of the action. - """ - def thread_send(): - self.session.send(self.stream, msg) - self.ioloop.add_callback(thread_send) - - def _handle_recv(self, msg): - """Callback for stream.on_recv. - - Unpacks message, and calls handlers with it. - """ - ident,smsg = self.session.feed_identities(msg) - msg = self.session.deserialize(smsg) - self.call_handlers(msg) - - def make_shell_socket(context, identity, address): socket = context.socket(zmq.DEALER) socket.linger = 1000 @@ -202,27 +84,57 @@ def make_stdin_socket(context, identity, address): socket.connect(address) return socket -class HBChannel(ZMQSocketChannel): +class HBChannel(Thread): """The heartbeat channel which monitors the kernel heartbeat. Note that the heartbeat channel is paused by default. As long as you start this channel, the kernel manager will ensure that it is paused and un-paused as appropriate. """ + context = None + session = None + socket = None + address = None + _exiting = False time_to_dead = 1. - socket = None poller = None _running = None _pause = None _beating = None def __init__(self, context, session, address): - super(HBChannel, self).__init__(context, session, address) + """Create the heartbeat monitor thread. + + Parameters + ---------- + context : :class:`zmq.Context` + The ZMQ context to use. + session : :class:`session.Session` + The session to use. + address : zmq url + Standard (ip, port) tuple that the kernel is listening on. + """ + super(HBChannel, self).__init__() + self.daemon = True + + self.context = context + self.session = session + if isinstance(address, tuple): + if address[1] == 0: + message = 'The port number for a channel cannot be 0.' + raise InvalidPortNumber(message) + address = "tcp://%s:%i" % address + self.address = address + atexit.register(self._notice_exit) + self._running = False - self._pause =True + self._pause = True self.poller = zmq.Poller() + def _notice_exit(self): + self._exiting = True + def _create_socket(self): if self.socket is not None: # close previous socket, before opening a new one @@ -322,7 +234,16 @@ class HBChannel(ZMQSocketChannel): def stop(self): """Stop the channel's event loop and join its thread.""" self._running = False - super(HBChannel, self).stop() + self.join() + self.close() + + def close(self): + if self.socket is not None: + try: + self.socket.close(linger=0) + except Exception: + pass + self.socket = None def call_handlers(self, since_last_heartbeat): """This method is called in the ioloop thread when a message arrives.