##// END OF EJS Templates
Added pausing to the heartbeat channel.
Brian Granger -
Show More
@@ -515,10 +515,13 b' class HBSocketChannel(ZmqSocketChannel):'
515 515 time_to_dead = 3.0
516 516 socket = None
517 517 poller = None
518 _running = None
519 _pause = None
518 520
519 521 def __init__(self, context, session, address):
520 522 super(HBSocketChannel, self).__init__(context, session, address)
521 523 self._running = False
524 self._pause = False
522 525
523 526 def _create_socket(self):
524 527 self.socket = self.context.socket(zmq.REQ)
@@ -538,51 +541,69 b' class HBSocketChannel(ZmqSocketChannel):'
538 541 # after the sockets are connected.
539 542 time.sleep(2.0)
540 543 while self._running:
541 since_last_heartbeat = 0.0
542 request_time = time.time()
543 try:
544 #io.rprint('Ping from HB channel') # dbg
545 self.socket.send_json('ping')
546 except zmq.ZMQError, e:
547 #io.rprint('*** HB Error:', e) # dbg
548 if e.errno == zmq.EFSM:
549 #io.rprint('sleep...', self.time_to_dead) # dbg
550 time.sleep(self.time_to_dead)
551 self._create_socket()
552 else:
553 raise
544 if self._pause:
545 time.sleep(self.time_to_dead)
554 546 else:
555 while True:
556 try:
557 self.socket.recv_json(zmq.NOBLOCK)
558 except zmq.ZMQError, e:
559 #io.rprint('*** HB Error 2:', e) # dbg
560 if e.errno == zmq.EAGAIN:
561 before_poll = time.time()
562 until_dead = self.time_to_dead - (before_poll -
563 request_time)
564
565 # When the return value of poll() is an empty list,
566 # that is when things have gone wrong (zeromq bug).
567 # As long as it is not an empty list, poll is
568 # working correctly even if it returns quickly.
569 # Note: poll timeout is in milliseconds.
570 self.poller.poll(1000*until_dead)
547 since_last_heartbeat = 0.0
548 request_time = time.time()
549 try:
550 #io.rprint('Ping from HB channel') # dbg
551 self.socket.send_json('ping')
552 except zmq.ZMQError, e:
553 #io.rprint('*** HB Error:', e) # dbg
554 if e.errno == zmq.EFSM:
555 #io.rprint('sleep...', self.time_to_dead) # dbg
556 time.sleep(self.time_to_dead)
557 self._create_socket()
558 else:
559 raise
560 else:
561 while True:
562 try:
563 self.socket.recv_json(zmq.NOBLOCK)
564 except zmq.ZMQError, e:
565 #io.rprint('*** HB Error 2:', e) # dbg
566 if e.errno == zmq.EAGAIN:
567 before_poll = time.time()
568 until_dead = self.time_to_dead - (before_poll -
569 request_time)
570
571 # When the return value of poll() is an empty list,
572 # that is when things have gone wrong (zeromq bug).
573 # As long as it is not an empty list, poll is
574 # working correctly even if it returns quickly.
575 # Note: poll timeout is in milliseconds.
576 self.poller.poll(1000*until_dead)
571 577
572 since_last_heartbeat = time.time() - request_time
573 if since_last_heartbeat > self.time_to_dead:
574 self.call_handlers(since_last_heartbeat)
575 break
578 since_last_heartbeat = time.time() - request_time
579 if since_last_heartbeat > self.time_to_dead:
580 self.call_handlers(since_last_heartbeat)
581 break
582 else:
583 # FIXME: We should probably log this instead.
584 raise
576 585 else:
577 # FIXME: We should probably log this instead.
578 raise
579 else:
580 until_dead = self.time_to_dead - (time.time() -
581 request_time)
582 if until_dead > 0.0:
583 #io.rprint('sleep...', self.time_to_dead) # dbg
584 time.sleep(until_dead)
585 break
586 until_dead = self.time_to_dead - (time.time() -
587 request_time)
588 if until_dead > 0.0:
589 #io.rprint('sleep...', self.time_to_dead) # dbg
590 time.sleep(until_dead)
591 break
592
593 def pause(self):
594 """Pause the heartbeat."""
595 self._pause = True
596
597 def unpause(self):
598 """Unpause the heartbeat."""
599 self._pause = False
600
601 def is_beating(self):
602 """Is the heartbeat running and not paused."""
603 if self.is_alive() and not self._pause:
604 return True
605 else:
606 return False
586 607
587 608 def stop(self):
588 609 self._running = False
@@ -646,8 +667,8 b' class KernelManager(HasTraits):'
646 667 # Channel management methods:
647 668 #--------------------------------------------------------------------------
648 669
649 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
650 """Starts the channels for this kernel.
670 def start_channels(self, xreq=True, sub=True, rep=True):
671 """Starts the channels for this kernel, but not the heartbeat.
651 672
652 673 This will create the channels if they do not exist and then start
653 674 them. If port numbers of 0 are being used (random ports) then you
@@ -660,8 +681,6 b' class KernelManager(HasTraits):'
660 681 self.sub_channel.start()
661 682 if rep:
662 683 self.rep_channel.start()
663 if hb:
664 self.hb_channel.start()
665 684
666 685 def stop_channels(self):
667 686 """Stops all the running channels for this kernel.
@@ -672,16 +691,13 b' class KernelManager(HasTraits):'
672 691 self.sub_channel.stop()
673 692 if self.rep_channel.is_alive():
674 693 self.rep_channel.stop()
675 if self.hb_channel.is_alive():
676 self.hb_channel.stop()
677 694
678 695 @property
679 696 def channels_running(self):
680 697 """Are any of the channels created and running?"""
681 698 return self.xreq_channel.is_alive() \
682 699 or self.sub_channel.is_alive() \
683 or self.rep_channel.is_alive() \
684 or self.hb_channel.is_alive()
700 or self.rep_channel.is_alive()
685 701
686 702 #--------------------------------------------------------------------------
687 703 # Kernel process management methods:
General Comments 0
You need to be logged in to leave comments. Login now