diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index f82fc87..f04fb04 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -515,10 +515,13 @@ class HBSocketChannel(ZmqSocketChannel): time_to_dead = 3.0 socket = None poller = None + _running = None + _pause = None def __init__(self, context, session, address): super(HBSocketChannel, self).__init__(context, session, address) self._running = False + self._pause = False def _create_socket(self): self.socket = self.context.socket(zmq.REQ) @@ -538,51 +541,69 @@ class HBSocketChannel(ZmqSocketChannel): # after the sockets are connected. time.sleep(2.0) while self._running: - since_last_heartbeat = 0.0 - request_time = time.time() - try: - #io.rprint('Ping from HB channel') # dbg - self.socket.send_json('ping') - except zmq.ZMQError, e: - #io.rprint('*** HB Error:', e) # dbg - if e.errno == zmq.EFSM: - #io.rprint('sleep...', self.time_to_dead) # dbg - time.sleep(self.time_to_dead) - self._create_socket() - else: - raise + if self._pause: + time.sleep(self.time_to_dead) else: - while True: - try: - self.socket.recv_json(zmq.NOBLOCK) - except zmq.ZMQError, e: - #io.rprint('*** HB Error 2:', e) # dbg - if e.errno == zmq.EAGAIN: - before_poll = time.time() - until_dead = self.time_to_dead - (before_poll - - request_time) - - # When the return value of poll() is an empty list, - # that is when things have gone wrong (zeromq bug). - # As long as it is not an empty list, poll is - # working correctly even if it returns quickly. - # Note: poll timeout is in milliseconds. - self.poller.poll(1000*until_dead) + since_last_heartbeat = 0.0 + request_time = time.time() + try: + #io.rprint('Ping from HB channel') # dbg + self.socket.send_json('ping') + except zmq.ZMQError, e: + #io.rprint('*** HB Error:', e) # dbg + if e.errno == zmq.EFSM: + #io.rprint('sleep...', self.time_to_dead) # dbg + time.sleep(self.time_to_dead) + self._create_socket() + else: + raise + else: + while True: + try: + self.socket.recv_json(zmq.NOBLOCK) + except zmq.ZMQError, e: + #io.rprint('*** HB Error 2:', e) # dbg + if e.errno == zmq.EAGAIN: + before_poll = time.time() + until_dead = self.time_to_dead - (before_poll - + request_time) + + # When the return value of poll() is an empty list, + # that is when things have gone wrong (zeromq bug). + # As long as it is not an empty list, poll is + # working correctly even if it returns quickly. + # Note: poll timeout is in milliseconds. + self.poller.poll(1000*until_dead) - since_last_heartbeat = time.time() - request_time - if since_last_heartbeat > self.time_to_dead: - self.call_handlers(since_last_heartbeat) - break + since_last_heartbeat = time.time() - request_time + if since_last_heartbeat > self.time_to_dead: + self.call_handlers(since_last_heartbeat) + break + else: + # FIXME: We should probably log this instead. + raise else: - # FIXME: We should probably log this instead. - raise - else: - until_dead = self.time_to_dead - (time.time() - - request_time) - if until_dead > 0.0: - #io.rprint('sleep...', self.time_to_dead) # dbg - time.sleep(until_dead) - break + until_dead = self.time_to_dead - (time.time() - + request_time) + if until_dead > 0.0: + #io.rprint('sleep...', self.time_to_dead) # dbg + time.sleep(until_dead) + break + + def pause(self): + """Pause the heartbeat.""" + self._pause = True + + def unpause(self): + """Unpause the heartbeat.""" + self._pause = False + + def is_beating(self): + """Is the heartbeat running and not paused.""" + if self.is_alive() and not self._pause: + return True + else: + return False def stop(self): self._running = False @@ -646,8 +667,8 @@ class KernelManager(HasTraits): # Channel management methods: #-------------------------------------------------------------------------- - def start_channels(self, xreq=True, sub=True, rep=True, hb=True): - """Starts the channels for this kernel. + def start_channels(self, xreq=True, sub=True, rep=True): + """Starts the channels for this kernel, but not the heartbeat. This will create the channels if they do not exist and then start them. If port numbers of 0 are being used (random ports) then you @@ -660,8 +681,6 @@ class KernelManager(HasTraits): self.sub_channel.start() if rep: self.rep_channel.start() - if hb: - self.hb_channel.start() def stop_channels(self): """Stops all the running channels for this kernel. @@ -672,16 +691,13 @@ class KernelManager(HasTraits): self.sub_channel.stop() if self.rep_channel.is_alive(): self.rep_channel.stop() - if self.hb_channel.is_alive(): - self.hb_channel.stop() @property def channels_running(self): """Are any of the channels created and running?""" return self.xreq_channel.is_alive() \ or self.sub_channel.is_alive() \ - or self.rep_channel.is_alive() \ - or self.hb_channel.is_alive() + or self.rep_channel.is_alive() #-------------------------------------------------------------------------- # Kernel process management methods: