Show More
@@ -515,10 +515,13 b' class HBSocketChannel(ZmqSocketChannel):' | |||||
515 | time_to_dead = 3.0 |
|
515 | time_to_dead = 3.0 | |
516 | socket = None |
|
516 | socket = None | |
517 | poller = None |
|
517 | poller = None | |
|
518 | _running = None | |||
|
519 | _pause = None | |||
518 |
|
520 | |||
519 | def __init__(self, context, session, address): |
|
521 | def __init__(self, context, session, address): | |
520 | super(HBSocketChannel, self).__init__(context, session, address) |
|
522 | super(HBSocketChannel, self).__init__(context, session, address) | |
521 | self._running = False |
|
523 | self._running = False | |
|
524 | self._pause = False | |||
522 |
|
525 | |||
523 | def _create_socket(self): |
|
526 | def _create_socket(self): | |
524 | self.socket = self.context.socket(zmq.REQ) |
|
527 | self.socket = self.context.socket(zmq.REQ) | |
@@ -538,51 +541,69 b' class HBSocketChannel(ZmqSocketChannel):' | |||||
538 | # after the sockets are connected. |
|
541 | # after the sockets are connected. | |
539 | time.sleep(2.0) |
|
542 | time.sleep(2.0) | |
540 | while self._running: |
|
543 | while self._running: | |
541 | since_last_heartbeat = 0.0 |
|
544 | if self._pause: | |
542 | request_time = time.time() |
|
545 | time.sleep(self.time_to_dead) | |
543 | try: |
|
|||
544 | #io.rprint('Ping from HB channel') # dbg |
|
|||
545 | self.socket.send_json('ping') |
|
|||
546 | except zmq.ZMQError, e: |
|
|||
547 | #io.rprint('*** HB Error:', e) # dbg |
|
|||
548 | if e.errno == zmq.EFSM: |
|
|||
549 | #io.rprint('sleep...', self.time_to_dead) # dbg |
|
|||
550 | time.sleep(self.time_to_dead) |
|
|||
551 | self._create_socket() |
|
|||
552 | else: |
|
|||
553 | raise |
|
|||
554 | else: |
|
546 | else: | |
555 | while True: |
|
547 | since_last_heartbeat = 0.0 | |
556 | try: |
|
548 | request_time = time.time() | |
557 | self.socket.recv_json(zmq.NOBLOCK) |
|
549 | try: | |
558 | except zmq.ZMQError, e: |
|
550 | #io.rprint('Ping from HB channel') # dbg | |
559 | #io.rprint('*** HB Error 2:', e) # dbg |
|
551 | self.socket.send_json('ping') | |
560 | if e.errno == zmq.EAGAIN: |
|
552 | except zmq.ZMQError, e: | |
561 | before_poll = time.time() |
|
553 | #io.rprint('*** HB Error:', e) # dbg | |
562 | until_dead = self.time_to_dead - (before_poll - |
|
554 | if e.errno == zmq.EFSM: | |
563 | request_time) |
|
555 | #io.rprint('sleep...', self.time_to_dead) # dbg | |
564 |
|
556 | time.sleep(self.time_to_dead) | ||
565 | # When the return value of poll() is an empty list, |
|
557 | self._create_socket() | |
566 | # that is when things have gone wrong (zeromq bug). |
|
558 | else: | |
567 | # As long as it is not an empty list, poll is |
|
559 | raise | |
568 | # working correctly even if it returns quickly. |
|
560 | else: | |
569 | # Note: poll timeout is in milliseconds. |
|
561 | while True: | |
570 | self.poller.poll(1000*until_dead) |
|
562 | try: | |
|
563 | self.socket.recv_json(zmq.NOBLOCK) | |||
|
564 | except zmq.ZMQError, e: | |||
|
565 | #io.rprint('*** HB Error 2:', e) # dbg | |||
|
566 | if e.errno == zmq.EAGAIN: | |||
|
567 | before_poll = time.time() | |||
|
568 | until_dead = self.time_to_dead - (before_poll - | |||
|
569 | request_time) | |||
|
570 | ||||
|
571 | # When the return value of poll() is an empty list, | |||
|
572 | # that is when things have gone wrong (zeromq bug). | |||
|
573 | # As long as it is not an empty list, poll is | |||
|
574 | # working correctly even if it returns quickly. | |||
|
575 | # Note: poll timeout is in milliseconds. | |||
|
576 | self.poller.poll(1000*until_dead) | |||
571 |
|
577 | |||
572 | since_last_heartbeat = time.time() - request_time |
|
578 | since_last_heartbeat = time.time() - request_time | |
573 | if since_last_heartbeat > self.time_to_dead: |
|
579 | if since_last_heartbeat > self.time_to_dead: | |
574 | self.call_handlers(since_last_heartbeat) |
|
580 | self.call_handlers(since_last_heartbeat) | |
575 | break |
|
581 | break | |
|
582 | else: | |||
|
583 | # FIXME: We should probably log this instead. | |||
|
584 | raise | |||
576 | else: |
|
585 | else: | |
577 | # FIXME: We should probably log this instead. |
|
586 | until_dead = self.time_to_dead - (time.time() - | |
578 | raise |
|
587 | request_time) | |
579 |
|
|
588 | if until_dead > 0.0: | |
580 |
|
|
589 | #io.rprint('sleep...', self.time_to_dead) # dbg | |
581 | request_time) |
|
590 | time.sleep(until_dead) | |
582 |
|
|
591 | break | |
583 | #io.rprint('sleep...', self.time_to_dead) # dbg |
|
592 | ||
584 | time.sleep(until_dead) |
|
593 | def pause(self): | |
585 | break |
|
594 | """Pause the heartbeat.""" | |
|
595 | self._pause = True | |||
|
596 | ||||
|
597 | def unpause(self): | |||
|
598 | """Unpause the heartbeat.""" | |||
|
599 | self._pause = False | |||
|
600 | ||||
|
601 | def is_beating(self): | |||
|
602 | """Is the heartbeat running and not paused.""" | |||
|
603 | if self.is_alive() and not self._pause: | |||
|
604 | return True | |||
|
605 | else: | |||
|
606 | return False | |||
586 |
|
607 | |||
587 | def stop(self): |
|
608 | def stop(self): | |
588 | self._running = False |
|
609 | self._running = False | |
@@ -646,8 +667,8 b' class KernelManager(HasTraits):' | |||||
646 | # Channel management methods: |
|
667 | # Channel management methods: | |
647 | #-------------------------------------------------------------------------- |
|
668 | #-------------------------------------------------------------------------- | |
648 |
|
669 | |||
649 |
def start_channels(self, xreq=True, sub=True, rep=True |
|
670 | def start_channels(self, xreq=True, sub=True, rep=True): | |
650 | """Starts the channels for this kernel. |
|
671 | """Starts the channels for this kernel, but not the heartbeat. | |
651 |
|
672 | |||
652 | This will create the channels if they do not exist and then start |
|
673 | This will create the channels if they do not exist and then start | |
653 | them. If port numbers of 0 are being used (random ports) then you |
|
674 | them. If port numbers of 0 are being used (random ports) then you | |
@@ -660,8 +681,6 b' class KernelManager(HasTraits):' | |||||
660 | self.sub_channel.start() |
|
681 | self.sub_channel.start() | |
661 | if rep: |
|
682 | if rep: | |
662 | self.rep_channel.start() |
|
683 | self.rep_channel.start() | |
663 | if hb: |
|
|||
664 | self.hb_channel.start() |
|
|||
665 |
|
684 | |||
666 | def stop_channels(self): |
|
685 | def stop_channels(self): | |
667 | """Stops all the running channels for this kernel. |
|
686 | """Stops all the running channels for this kernel. | |
@@ -672,16 +691,13 b' class KernelManager(HasTraits):' | |||||
672 | self.sub_channel.stop() |
|
691 | self.sub_channel.stop() | |
673 | if self.rep_channel.is_alive(): |
|
692 | if self.rep_channel.is_alive(): | |
674 | self.rep_channel.stop() |
|
693 | self.rep_channel.stop() | |
675 | if self.hb_channel.is_alive(): |
|
|||
676 | self.hb_channel.stop() |
|
|||
677 |
|
694 | |||
678 | @property |
|
695 | @property | |
679 | def channels_running(self): |
|
696 | def channels_running(self): | |
680 | """Are any of the channels created and running?""" |
|
697 | """Are any of the channels created and running?""" | |
681 | return self.xreq_channel.is_alive() \ |
|
698 | return self.xreq_channel.is_alive() \ | |
682 | or self.sub_channel.is_alive() \ |
|
699 | or self.sub_channel.is_alive() \ | |
683 |
or self.rep_channel.is_alive() |
|
700 | or self.rep_channel.is_alive() | |
684 | or self.hb_channel.is_alive() |
|
|||
685 |
|
701 | |||
686 | #-------------------------------------------------------------------------- |
|
702 | #-------------------------------------------------------------------------- | |
687 | # Kernel process management methods: |
|
703 | # Kernel process management methods: |
General Comments 0
You need to be logged in to leave comments.
Login now