Show More
@@ -134,11 +134,13 b' class BlockingStdInSocketChannel(StdInSocketChannel):' | |||||
134 |
|
134 | |||
135 | class BlockingHBSocketChannel(HBSocketChannel): |
|
135 | class BlockingHBSocketChannel(HBSocketChannel): | |
136 |
|
136 | |||
137 |
# This kernel needs r |
|
137 | # This kernel needs quicker monitoring, shorten to 1 sec. | |
138 | time_to_dead = 0.2 |
|
138 | # less than 0.5s is unreliable, and will get occasional | |
|
139 | # false reports of missed beats. | |||
|
140 | time_to_dead = 1. | |||
139 |
|
141 | |||
140 | def call_handlers(self, since_last_heartbeat): |
|
142 | def call_handlers(self, since_last_heartbeat): | |
141 | #io.rprint('[[Heart]]', since_last_heartbeat) # dbg |
|
143 | """pause beating on missed heartbeat""" | |
142 | pass |
|
144 | pass | |
143 |
|
145 | |||
144 |
|
146 |
@@ -471,83 +471,89 b' class HBSocketChannel(ZMQSocketChannel):' | |||||
471 | poller = None |
|
471 | poller = None | |
472 | _running = None |
|
472 | _running = None | |
473 | _pause = None |
|
473 | _pause = None | |
|
474 | _beating = None | |||
474 |
|
475 | |||
475 | def __init__(self, context, session, address): |
|
476 | def __init__(self, context, session, address): | |
476 | super(HBSocketChannel, self).__init__(context, session, address) |
|
477 | super(HBSocketChannel, self).__init__(context, session, address) | |
477 | self._running = False |
|
478 | self._running = False | |
478 |
self._pause = |
|
479 | self._pause =True | |
|
480 | self.poller = zmq.Poller() | |||
479 |
|
481 | |||
480 | def _create_socket(self): |
|
482 | def _create_socket(self): | |
|
483 | if self.socket is not None: | |||
|
484 | # close previous socket, before opening a new one | |||
|
485 | self.poller.unregister(self.socket) | |||
|
486 | self.socket.close() | |||
481 | self.socket = self.context.socket(zmq.REQ) |
|
487 | self.socket = self.context.socket(zmq.REQ) | |
482 |
self.socket.setsockopt(zmq. |
|
488 | self.socket.setsockopt(zmq.LINGER, 0) | |
483 | self.socket.connect('tcp://%s:%i' % self.address) |
|
489 | self.socket.connect('tcp://%s:%i' % self.address) | |
484 | self.poller = zmq.Poller() |
|
490 | ||
485 | self.poller.register(self.socket, zmq.POLLIN) |
|
491 | self.poller.register(self.socket, zmq.POLLIN) | |
|
492 | ||||
|
493 | def _poll(self, start_time): | |||
|
494 | """poll for heartbeat replies until we reach self.time_to_dead | |||
|
495 | ||||
|
496 | Ignores interrupts, and returns the result of poll(), which | |||
|
497 | will be an empty list if no messages arrived before the timeout, | |||
|
498 | or the event tuple if there is a message to receive. | |||
|
499 | """ | |||
|
500 | ||||
|
501 | until_dead = self.time_to_dead - (time.time() - start_time) | |||
|
502 | # ensure poll at least once | |||
|
503 | until_dead = max(until_dead, 1e-3) | |||
|
504 | events = [] | |||
|
505 | while True: | |||
|
506 | try: | |||
|
507 | events = self.poller.poll(1000 * until_dead) | |||
|
508 | except zmq.ZMQError as e: | |||
|
509 | if e.errno == errno.EINTR: | |||
|
510 | # ignore interrupts during heartbeat | |||
|
511 | # this may never actually happen | |||
|
512 | until_dead = self.time_to_dead - (time.time() - start_time) | |||
|
513 | until_dead = max(until_dead, 1e-3) | |||
|
514 | pass | |||
|
515 | else: | |||
|
516 | raise | |||
|
517 | else: | |||
|
518 | break | |||
|
519 | return events | |||
486 |
|
520 | |||
487 | def run(self): |
|
521 | def run(self): | |
488 | """The thread's main activity. Call start() instead.""" |
|
522 | """The thread's main activity. Call start() instead.""" | |
489 | self._create_socket() |
|
523 | self._create_socket() | |
490 | self._running = True |
|
524 | self._running = True | |
|
525 | self._beating = True | |||
|
526 | ||||
491 | while self._running: |
|
527 | while self._running: | |
492 | if self._pause: |
|
528 | if self._pause: | |
|
529 | # just sleep, and skip the rest of the loop | |||
493 | time.sleep(self.time_to_dead) |
|
530 | time.sleep(self.time_to_dead) | |
|
531 | continue | |||
|
532 | ||||
|
533 | since_last_heartbeat = 0.0 | |||
|
534 | # io.rprint('Ping from HB channel') # dbg | |||
|
535 | # no need to catch EFSM here, because the previous event was | |||
|
536 | # either a recv or connect, which cannot be followed by EFSM | |||
|
537 | self.socket.send(b'ping') | |||
|
538 | request_time = time.time() | |||
|
539 | ready = self._poll(request_time) | |||
|
540 | if ready: | |||
|
541 | self._beating = True | |||
|
542 | # the poll above guarantees we have something to recv | |||
|
543 | self.socket.recv() | |||
|
544 | # sleep the remainder of the cycle | |||
|
545 | remainder = self.time_to_dead - (time.time() - request_time) | |||
|
546 | if remainder > 0: | |||
|
547 | time.sleep(remainder) | |||
|
548 | continue | |||
494 | else: |
|
549 | else: | |
495 | since_last_heartbeat = 0.0 |
|
550 | # nothing was received within the time limit, signal heart failure | |
496 |
|
|
551 | self._beating = False | |
497 | try: |
|
552 | since_last_heartbeat = time.time() - request_time | |
498 | #io.rprint('Ping from HB channel') # dbg |
|
553 | self.call_handlers(since_last_heartbeat) | |
499 | self.socket.send(b'ping') |
|
554 | # and close/reopen the socket, because the REQ/REP cycle has been broken | |
500 | except zmq.ZMQError, e: |
|
555 | self._create_socket() | |
501 | #io.rprint('*** HB Error:', e) # dbg |
|
556 | continue | |
502 | if e.errno == zmq.EFSM: |
|
|||
503 | #io.rprint('sleep...', self.time_to_dead) # dbg |
|
|||
504 | time.sleep(self.time_to_dead) |
|
|||
505 | self._create_socket() |
|
|||
506 | else: |
|
|||
507 | raise |
|
|||
508 | else: |
|
|||
509 | while True: |
|
|||
510 | try: |
|
|||
511 | self.socket.recv(zmq.NOBLOCK) |
|
|||
512 | except zmq.ZMQError, e: |
|
|||
513 | #io.rprint('*** HB Error 2:', e) # dbg |
|
|||
514 | if e.errno == zmq.EAGAIN: |
|
|||
515 | before_poll = time.time() |
|
|||
516 | until_dead = self.time_to_dead - (before_poll - |
|
|||
517 | request_time) |
|
|||
518 |
|
||||
519 | # When the return value of poll() is an empty |
|
|||
520 | # list, that is when things have gone wrong |
|
|||
521 | # (zeromq bug). As long as it is not an empty |
|
|||
522 | # list, poll is working correctly even if it |
|
|||
523 | # returns quickly. Note: poll timeout is in |
|
|||
524 | # milliseconds. |
|
|||
525 | if until_dead > 0.0: |
|
|||
526 | while True: |
|
|||
527 | try: |
|
|||
528 | self.poller.poll(1000 * until_dead) |
|
|||
529 | except zmq.ZMQError as e: |
|
|||
530 | if e.errno == errno.EINTR: |
|
|||
531 | continue |
|
|||
532 | else: |
|
|||
533 | raise |
|
|||
534 | else: |
|
|||
535 | break |
|
|||
536 |
|
||||
537 | since_last_heartbeat = time.time()-request_time |
|
|||
538 | if since_last_heartbeat > self.time_to_dead: |
|
|||
539 | self.call_handlers(since_last_heartbeat) |
|
|||
540 | break |
|
|||
541 | else: |
|
|||
542 | # FIXME: We should probably log this instead. |
|
|||
543 | raise |
|
|||
544 | else: |
|
|||
545 | until_dead = self.time_to_dead - (time.time() - |
|
|||
546 | request_time) |
|
|||
547 | if until_dead > 0.0: |
|
|||
548 | #io.rprint('sleep...', self.time_to_dead) # dbg |
|
|||
549 | time.sleep(until_dead) |
|
|||
550 | break |
|
|||
551 |
|
557 | |||
552 | def pause(self): |
|
558 | def pause(self): | |
553 | """Pause the heartbeat.""" |
|
559 | """Pause the heartbeat.""" | |
@@ -558,8 +564,8 b' class HBSocketChannel(ZMQSocketChannel):' | |||||
558 | self._pause = False |
|
564 | self._pause = False | |
559 |
|
565 | |||
560 | def is_beating(self): |
|
566 | def is_beating(self): | |
561 | """Is the heartbeat running and not paused.""" |
|
567 | """Is the heartbeat running and responsive (and not paused).""" | |
562 | if self.is_alive() and not self._pause: |
|
568 | if self.is_alive() and not self._pause and self._beating: | |
563 | return True |
|
569 | return True | |
564 | else: |
|
570 | else: | |
565 | return False |
|
571 | return False | |
@@ -573,7 +579,7 b' class HBSocketChannel(ZMQSocketChannel):' | |||||
573 |
|
579 | |||
574 | Subclasses should override this method to handle incoming messages. |
|
580 | Subclasses should override this method to handle incoming messages. | |
575 | It is important to remember that this method is called in the thread |
|
581 | It is important to remember that this method is called in the thread | |
576 | so that some logic must be done to ensure that the application leve |
|
582 | so that some logic must be done to ensure that the application level | |
577 | handlers are called in the application thread. |
|
583 | handlers are called in the application thread. | |
578 | """ |
|
584 | """ | |
579 | raise NotImplementedError('call_handlers must be defined in a subclass.') |
|
585 | raise NotImplementedError('call_handlers must be defined in a subclass.') | |
@@ -900,16 +906,18 b' class KernelManager(HasTraits):' | |||||
900 | @property |
|
906 | @property | |
901 | def is_alive(self): |
|
907 | def is_alive(self): | |
902 | """Is the kernel process still running?""" |
|
908 | """Is the kernel process still running?""" | |
903 | # FIXME: not using a heartbeat means this method is broken for any |
|
|||
904 | # remote kernel, it's only capable of handling local kernels. |
|
|||
905 | if self.has_kernel: |
|
909 | if self.has_kernel: | |
906 | if self.kernel.poll() is None: |
|
910 | if self.kernel.poll() is None: | |
907 | return True |
|
911 | return True | |
908 | else: |
|
912 | else: | |
909 | return False |
|
913 | return False | |
|
914 | elif self._hb_channel is not None: | |||
|
915 | # We didn't start the kernel with this KernelManager so we | |||
|
916 | # use the heartbeat. | |||
|
917 | return self._hb_channel.is_beating() | |||
910 | else: |
|
918 | else: | |
911 | # We didn't start the kernel with this KernelManager so we don't |
|
919 | # no heartbeat and not local, we can't tell if it's running, | |
912 | # know if it is running. We should use a heartbeat for this case. |
|
920 | # so naively return True | |
913 | return True |
|
921 | return True | |
914 |
|
922 | |||
915 | #-------------------------------------------------------------------------- |
|
923 | #-------------------------------------------------------------------------- |
General Comments 0
You need to be logged in to leave comments.
Login now