##// END OF EJS Templates
Added pausing to the heartbeat channel.
Brian Granger -
Show More
@@ -515,10 +515,13 b' class HBSocketChannel(ZmqSocketChannel):'
515 time_to_dead = 3.0
515 time_to_dead = 3.0
516 socket = None
516 socket = None
517 poller = None
517 poller = None
518 _running = None
519 _pause = None
518
520
519 def __init__(self, context, session, address):
521 def __init__(self, context, session, address):
520 super(HBSocketChannel, self).__init__(context, session, address)
522 super(HBSocketChannel, self).__init__(context, session, address)
521 self._running = False
523 self._running = False
524 self._pause = False
522
525
523 def _create_socket(self):
526 def _create_socket(self):
524 self.socket = self.context.socket(zmq.REQ)
527 self.socket = self.context.socket(zmq.REQ)
@@ -538,51 +541,69 b' class HBSocketChannel(ZmqSocketChannel):'
538 # after the sockets are connected.
541 # after the sockets are connected.
539 time.sleep(2.0)
542 time.sleep(2.0)
540 while self._running:
543 while self._running:
541 since_last_heartbeat = 0.0
544 if self._pause:
542 request_time = time.time()
545 time.sleep(self.time_to_dead)
543 try:
544 #io.rprint('Ping from HB channel') # dbg
545 self.socket.send_json('ping')
546 except zmq.ZMQError, e:
547 #io.rprint('*** HB Error:', e) # dbg
548 if e.errno == zmq.EFSM:
549 #io.rprint('sleep...', self.time_to_dead) # dbg
550 time.sleep(self.time_to_dead)
551 self._create_socket()
552 else:
553 raise
554 else:
546 else:
555 while True:
547 since_last_heartbeat = 0.0
556 try:
548 request_time = time.time()
557 self.socket.recv_json(zmq.NOBLOCK)
549 try:
558 except zmq.ZMQError, e:
550 #io.rprint('Ping from HB channel') # dbg
559 #io.rprint('*** HB Error 2:', e) # dbg
551 self.socket.send_json('ping')
560 if e.errno == zmq.EAGAIN:
552 except zmq.ZMQError, e:
561 before_poll = time.time()
553 #io.rprint('*** HB Error:', e) # dbg
562 until_dead = self.time_to_dead - (before_poll -
554 if e.errno == zmq.EFSM:
563 request_time)
555 #io.rprint('sleep...', self.time_to_dead) # dbg
564
556 time.sleep(self.time_to_dead)
565 # When the return value of poll() is an empty list,
557 self._create_socket()
566 # that is when things have gone wrong (zeromq bug).
558 else:
567 # As long as it is not an empty list, poll is
559 raise
568 # working correctly even if it returns quickly.
560 else:
569 # Note: poll timeout is in milliseconds.
561 while True:
570 self.poller.poll(1000*until_dead)
562 try:
563 self.socket.recv_json(zmq.NOBLOCK)
564 except zmq.ZMQError, e:
565 #io.rprint('*** HB Error 2:', e) # dbg
566 if e.errno == zmq.EAGAIN:
567 before_poll = time.time()
568 until_dead = self.time_to_dead - (before_poll -
569 request_time)
570
571 # When the return value of poll() is an empty list,
572 # that is when things have gone wrong (zeromq bug).
573 # As long as it is not an empty list, poll is
574 # working correctly even if it returns quickly.
575 # Note: poll timeout is in milliseconds.
576 self.poller.poll(1000*until_dead)
571
577
572 since_last_heartbeat = time.time() - request_time
578 since_last_heartbeat = time.time() - request_time
573 if since_last_heartbeat > self.time_to_dead:
579 if since_last_heartbeat > self.time_to_dead:
574 self.call_handlers(since_last_heartbeat)
580 self.call_handlers(since_last_heartbeat)
575 break
581 break
582 else:
583 # FIXME: We should probably log this instead.
584 raise
576 else:
585 else:
577 # FIXME: We should probably log this instead.
586 until_dead = self.time_to_dead - (time.time() -
578 raise
587 request_time)
579 else:
588 if until_dead > 0.0:
580 until_dead = self.time_to_dead - (time.time() -
589 #io.rprint('sleep...', self.time_to_dead) # dbg
581 request_time)
590 time.sleep(until_dead)
582 if until_dead > 0.0:
591 break
583 #io.rprint('sleep...', self.time_to_dead) # dbg
592
584 time.sleep(until_dead)
593 def pause(self):
585 break
594 """Pause the heartbeat."""
595 self._pause = True
596
597 def unpause(self):
598 """Unpause the heartbeat."""
599 self._pause = False
600
601 def is_beating(self):
602 """Is the heartbeat running and not paused."""
603 if self.is_alive() and not self._pause:
604 return True
605 else:
606 return False
586
607
587 def stop(self):
608 def stop(self):
588 self._running = False
609 self._running = False
@@ -646,8 +667,8 b' class KernelManager(HasTraits):'
646 # Channel management methods:
667 # Channel management methods:
647 #--------------------------------------------------------------------------
668 #--------------------------------------------------------------------------
648
669
649 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
670 def start_channels(self, xreq=True, sub=True, rep=True):
650 """Starts the channels for this kernel.
671 """Starts the channels for this kernel, but not the heartbeat.
651
672
652 This will create the channels if they do not exist and then start
673 This will create the channels if they do not exist and then start
653 them. If port numbers of 0 are being used (random ports) then you
674 them. If port numbers of 0 are being used (random ports) then you
@@ -660,8 +681,6 b' class KernelManager(HasTraits):'
660 self.sub_channel.start()
681 self.sub_channel.start()
661 if rep:
682 if rep:
662 self.rep_channel.start()
683 self.rep_channel.start()
663 if hb:
664 self.hb_channel.start()
665
684
666 def stop_channels(self):
685 def stop_channels(self):
667 """Stops all the running channels for this kernel.
686 """Stops all the running channels for this kernel.
@@ -672,16 +691,13 b' class KernelManager(HasTraits):'
672 self.sub_channel.stop()
691 self.sub_channel.stop()
673 if self.rep_channel.is_alive():
692 if self.rep_channel.is_alive():
674 self.rep_channel.stop()
693 self.rep_channel.stop()
675 if self.hb_channel.is_alive():
676 self.hb_channel.stop()
677
694
678 @property
695 @property
679 def channels_running(self):
696 def channels_running(self):
680 """Are any of the channels created and running?"""
697 """Are any of the channels created and running?"""
681 return self.xreq_channel.is_alive() \
698 return self.xreq_channel.is_alive() \
682 or self.sub_channel.is_alive() \
699 or self.sub_channel.is_alive() \
683 or self.rep_channel.is_alive() \
700 or self.rep_channel.is_alive()
684 or self.hb_channel.is_alive()
685
701
686 #--------------------------------------------------------------------------
702 #--------------------------------------------------------------------------
687 # Kernel process management methods:
703 # Kernel process management methods:
General Comments 0
You need to be logged in to leave comments. Login now