Show More
@@ -515,10 +515,13 b' class HBSocketChannel(ZmqSocketChannel):' | |||
|
515 | 515 | time_to_dead = 3.0 |
|
516 | 516 | socket = None |
|
517 | 517 | poller = None |
|
518 | _running = None | |
|
519 | _pause = None | |
|
518 | 520 | |
|
519 | 521 | def __init__(self, context, session, address): |
|
520 | 522 | super(HBSocketChannel, self).__init__(context, session, address) |
|
521 | 523 | self._running = False |
|
524 | self._pause = False | |
|
522 | 525 | |
|
523 | 526 | def _create_socket(self): |
|
524 | 527 | self.socket = self.context.socket(zmq.REQ) |
@@ -538,51 +541,69 b' class HBSocketChannel(ZmqSocketChannel):' | |||
|
538 | 541 | # after the sockets are connected. |
|
539 | 542 | time.sleep(2.0) |
|
540 | 543 | while self._running: |
|
541 | since_last_heartbeat = 0.0 | |
|
542 | request_time = time.time() | |
|
543 | try: | |
|
544 | #io.rprint('Ping from HB channel') # dbg | |
|
545 | self.socket.send_json('ping') | |
|
546 | except zmq.ZMQError, e: | |
|
547 | #io.rprint('*** HB Error:', e) # dbg | |
|
548 | if e.errno == zmq.EFSM: | |
|
549 | #io.rprint('sleep...', self.time_to_dead) # dbg | |
|
550 | time.sleep(self.time_to_dead) | |
|
551 | self._create_socket() | |
|
552 | else: | |
|
553 | raise | |
|
544 | if self._pause: | |
|
545 | time.sleep(self.time_to_dead) | |
|
554 | 546 | else: |
|
555 | while True: | |
|
556 | try: | |
|
557 | self.socket.recv_json(zmq.NOBLOCK) | |
|
558 | except zmq.ZMQError, e: | |
|
559 | #io.rprint('*** HB Error 2:', e) # dbg | |
|
560 | if e.errno == zmq.EAGAIN: | |
|
561 | before_poll = time.time() | |
|
562 | until_dead = self.time_to_dead - (before_poll - | |
|
563 | request_time) | |
|
564 | ||
|
565 | # When the return value of poll() is an empty list, | |
|
566 | # that is when things have gone wrong (zeromq bug). | |
|
567 | # As long as it is not an empty list, poll is | |
|
568 | # working correctly even if it returns quickly. | |
|
569 | # Note: poll timeout is in milliseconds. | |
|
570 | self.poller.poll(1000*until_dead) | |
|
547 | since_last_heartbeat = 0.0 | |
|
548 | request_time = time.time() | |
|
549 | try: | |
|
550 | #io.rprint('Ping from HB channel') # dbg | |
|
551 | self.socket.send_json('ping') | |
|
552 | except zmq.ZMQError, e: | |
|
553 | #io.rprint('*** HB Error:', e) # dbg | |
|
554 | if e.errno == zmq.EFSM: | |
|
555 | #io.rprint('sleep...', self.time_to_dead) # dbg | |
|
556 | time.sleep(self.time_to_dead) | |
|
557 | self._create_socket() | |
|
558 | else: | |
|
559 | raise | |
|
560 | else: | |
|
561 | while True: | |
|
562 | try: | |
|
563 | self.socket.recv_json(zmq.NOBLOCK) | |
|
564 | except zmq.ZMQError, e: | |
|
565 | #io.rprint('*** HB Error 2:', e) # dbg | |
|
566 | if e.errno == zmq.EAGAIN: | |
|
567 | before_poll = time.time() | |
|
568 | until_dead = self.time_to_dead - (before_poll - | |
|
569 | request_time) | |
|
570 | ||
|
571 | # When the return value of poll() is an empty list, | |
|
572 | # that is when things have gone wrong (zeromq bug). | |
|
573 | # As long as it is not an empty list, poll is | |
|
574 | # working correctly even if it returns quickly. | |
|
575 | # Note: poll timeout is in milliseconds. | |
|
576 | self.poller.poll(1000*until_dead) | |
|
571 | 577 | |
|
572 | since_last_heartbeat = time.time() - request_time | |
|
573 | if since_last_heartbeat > self.time_to_dead: | |
|
574 | self.call_handlers(since_last_heartbeat) | |
|
575 | break | |
|
578 | since_last_heartbeat = time.time() - request_time | |
|
579 | if since_last_heartbeat > self.time_to_dead: | |
|
580 | self.call_handlers(since_last_heartbeat) | |
|
581 | break | |
|
582 | else: | |
|
583 | # FIXME: We should probably log this instead. | |
|
584 | raise | |
|
576 | 585 | else: |
|
577 | # FIXME: We should probably log this instead. | |
|
578 | raise | |
|
579 |
|
|
|
580 |
|
|
|
581 | request_time) | |
|
582 |
|
|
|
583 | #io.rprint('sleep...', self.time_to_dead) # dbg | |
|
584 | time.sleep(until_dead) | |
|
585 | break | |
|
586 | until_dead = self.time_to_dead - (time.time() - | |
|
587 | request_time) | |
|
588 | if until_dead > 0.0: | |
|
589 | #io.rprint('sleep...', self.time_to_dead) # dbg | |
|
590 | time.sleep(until_dead) | |
|
591 | break | |
|
592 | ||
|
593 | def pause(self): | |
|
594 | """Pause the heartbeat.""" | |
|
595 | self._pause = True | |
|
596 | ||
|
597 | def unpause(self): | |
|
598 | """Unpause the heartbeat.""" | |
|
599 | self._pause = False | |
|
600 | ||
|
601 | def is_beating(self): | |
|
602 | """Is the heartbeat running and not paused.""" | |
|
603 | if self.is_alive() and not self._pause: | |
|
604 | return True | |
|
605 | else: | |
|
606 | return False | |
|
586 | 607 | |
|
587 | 608 | def stop(self): |
|
588 | 609 | self._running = False |
@@ -646,8 +667,8 b' class KernelManager(HasTraits):' | |||
|
646 | 667 | # Channel management methods: |
|
647 | 668 | #-------------------------------------------------------------------------- |
|
648 | 669 | |
|
649 |
def start_channels(self, xreq=True, sub=True, rep=True |
|
|
650 | """Starts the channels for this kernel. | |
|
670 | def start_channels(self, xreq=True, sub=True, rep=True): | |
|
671 | """Starts the channels for this kernel, but not the heartbeat. | |
|
651 | 672 | |
|
652 | 673 | This will create the channels if they do not exist and then start |
|
653 | 674 | them. If port numbers of 0 are being used (random ports) then you |
@@ -660,8 +681,6 b' class KernelManager(HasTraits):' | |||
|
660 | 681 | self.sub_channel.start() |
|
661 | 682 | if rep: |
|
662 | 683 | self.rep_channel.start() |
|
663 | if hb: | |
|
664 | self.hb_channel.start() | |
|
665 | 684 | |
|
666 | 685 | def stop_channels(self): |
|
667 | 686 | """Stops all the running channels for this kernel. |
@@ -672,16 +691,13 b' class KernelManager(HasTraits):' | |||
|
672 | 691 | self.sub_channel.stop() |
|
673 | 692 | if self.rep_channel.is_alive(): |
|
674 | 693 | self.rep_channel.stop() |
|
675 | if self.hb_channel.is_alive(): | |
|
676 | self.hb_channel.stop() | |
|
677 | 694 | |
|
678 | 695 | @property |
|
679 | 696 | def channels_running(self): |
|
680 | 697 | """Are any of the channels created and running?""" |
|
681 | 698 | return self.xreq_channel.is_alive() \ |
|
682 | 699 | or self.sub_channel.is_alive() \ |
|
683 |
or self.rep_channel.is_alive() |
|
|
684 | or self.hb_channel.is_alive() | |
|
700 | or self.rep_channel.is_alive() | |
|
685 | 701 | |
|
686 | 702 | #-------------------------------------------------------------------------- |
|
687 | 703 | # Kernel process management methods: |
General Comments 0
You need to be logged in to leave comments.
Login now