From f7e44e62cef04b43ca953f5b068d908d7ee2b367 2011-12-06 01:40:59 From: MinRK Date: 2011-12-06 01:40:59 Subject: [PATCH] Fixes to the heartbeat channel * The heartbeat channel had some erroneous zeromq logic, and entirely False comments (as described in #967). This has been fixed. * KernelManager.is_alive() checks if the hb_channel is running if the kernel is not owned, rather than always returning True. * BlockingKM's hb_channel has been relaxed to 1s polling, because replies are not reliably much faster than that. There are occasional >0.5s outlier responses. --- diff --git a/IPython/zmq/blockingkernelmanager.py b/IPython/zmq/blockingkernelmanager.py index c250fe0..581a844 100644 --- a/IPython/zmq/blockingkernelmanager.py +++ b/IPython/zmq/blockingkernelmanager.py @@ -134,11 +134,13 @@ class BlockingStdInSocketChannel(StdInSocketChannel): class BlockingHBSocketChannel(HBSocketChannel): - # This kernel needs rapid monitoring capabilities - time_to_dead = 0.2 + # This kernel needs quicker monitoring, shorten to 1 sec. + # less than 0.5s is unreliable, and will get occasional + # false reports of missed beats. + time_to_dead = 1. def call_handlers(self, since_last_heartbeat): - #io.rprint('[[Heart]]', since_last_heartbeat) # dbg + """pause beating on missed heartbeat""" pass diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index dc7a0fe..4c3b8c1 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -471,83 +471,89 @@ class HBSocketChannel(ZMQSocketChannel): poller = None _running = None _pause = None + _beating = None def __init__(self, context, session, address): super(HBSocketChannel, self).__init__(context, session, address) self._running = False - self._pause = True + self._pause =True + self.poller = zmq.Poller() def _create_socket(self): + if self.socket is not None: + # close previous socket, before opening a new one + self.poller.unregister(self.socket) + self.socket.close() self.socket = self.context.socket(zmq.REQ) - self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) + self.socket.setsockopt(zmq.LINGER, 0) self.socket.connect('tcp://%s:%i' % self.address) - self.poller = zmq.Poller() + self.poller.register(self.socket, zmq.POLLIN) + + def _poll(self, start_time): + """poll for heartbeat replies until we reach self.time_to_dead + + Ignores interrupts, and returns the result of poll(), which + will be an empty list if no messages arrived before the timeout, + or the event tuple if there is a message to receive. + """ + + until_dead = self.time_to_dead - (time.time() - start_time) + # ensure poll at least once + until_dead = max(until_dead, 1e-3) + events = [] + while True: + try: + events = self.poller.poll(1000 * until_dead) + except zmq.ZMQError as e: + if e.errno == errno.EINTR: + # ignore interrupts during heartbeat + # this may never actually happen + until_dead = self.time_to_dead - (time.time() - start_time) + until_dead = max(until_dead, 1e-3) + pass + else: + raise + else: + break + return events def run(self): """The thread's main activity. Call start() instead.""" self._create_socket() self._running = True + self._beating = True + while self._running: if self._pause: + # just sleep, and skip the rest of the loop time.sleep(self.time_to_dead) + continue + + since_last_heartbeat = 0.0 + # io.rprint('Ping from HB channel') # dbg + # no need to catch EFSM here, because the previous event was + # either a recv or connect, which cannot be followed by EFSM + self.socket.send(b'ping') + request_time = time.time() + ready = self._poll(request_time) + if ready: + self._beating = True + # the poll above guarantees we have something to recv + self.socket.recv() + # sleep the remainder of the cycle + remainder = self.time_to_dead - (time.time() - request_time) + if remainder > 0: + time.sleep(remainder) + continue else: - since_last_heartbeat = 0.0 - request_time = time.time() - try: - #io.rprint('Ping from HB channel') # dbg - self.socket.send(b'ping') - except zmq.ZMQError, e: - #io.rprint('*** HB Error:', e) # dbg - if e.errno == zmq.EFSM: - #io.rprint('sleep...', self.time_to_dead) # dbg - time.sleep(self.time_to_dead) - self._create_socket() - else: - raise - else: - while True: - try: - self.socket.recv(zmq.NOBLOCK) - except zmq.ZMQError, e: - #io.rprint('*** HB Error 2:', e) # dbg - if e.errno == zmq.EAGAIN: - before_poll = time.time() - until_dead = self.time_to_dead - (before_poll - - request_time) - - # When the return value of poll() is an empty - # list, that is when things have gone wrong - # (zeromq bug). As long as it is not an empty - # list, poll is working correctly even if it - # returns quickly. Note: poll timeout is in - # milliseconds. - if until_dead > 0.0: - while True: - try: - self.poller.poll(1000 * until_dead) - except zmq.ZMQError as e: - if e.errno == errno.EINTR: - continue - else: - raise - else: - break - - since_last_heartbeat = time.time()-request_time - if since_last_heartbeat > self.time_to_dead: - self.call_handlers(since_last_heartbeat) - break - else: - # FIXME: We should probably log this instead. - raise - else: - until_dead = self.time_to_dead - (time.time() - - request_time) - if until_dead > 0.0: - #io.rprint('sleep...', self.time_to_dead) # dbg - time.sleep(until_dead) - break + # nothing was received within the time limit, signal heart failure + self._beating = False + since_last_heartbeat = time.time() - request_time + self.call_handlers(since_last_heartbeat) + # and close/reopen the socket, because the REQ/REP cycle has been broken + self._create_socket() + continue def pause(self): """Pause the heartbeat.""" @@ -558,8 +564,8 @@ class HBSocketChannel(ZMQSocketChannel): self._pause = False def is_beating(self): - """Is the heartbeat running and not paused.""" - if self.is_alive() and not self._pause: + """Is the heartbeat running and responsive (and not paused).""" + if self.is_alive() and not self._pause and self._beating: return True else: return False @@ -573,7 +579,7 @@ class HBSocketChannel(ZMQSocketChannel): Subclasses should override this method to handle incoming messages. It is important to remember that this method is called in the thread - so that some logic must be done to ensure that the application leve + so that some logic must be done to ensure that the application level handlers are called in the application thread. """ raise NotImplementedError('call_handlers must be defined in a subclass.') @@ -900,16 +906,18 @@ class KernelManager(HasTraits): @property def is_alive(self): """Is the kernel process still running?""" - # FIXME: not using a heartbeat means this method is broken for any - # remote kernel, it's only capable of handling local kernels. if self.has_kernel: if self.kernel.poll() is None: return True else: return False + elif self._hb_channel is not None: + # We didn't start the kernel with this KernelManager so we + # use the heartbeat. + return self._hb_channel.is_beating() else: - # We didn't start the kernel with this KernelManager so we don't - # know if it is running. We should use a heartbeat for this case. + # no heartbeat and not local, we can't tell if it's running, + # so naively return True return True #--------------------------------------------------------------------------