##// END OF EJS Templates
Fixes to the heartbeat channel...
MinRK -
Show More
@@ -134,11 +134,13 b' class BlockingStdInSocketChannel(StdInSocketChannel):'
134 134
135 135 class BlockingHBSocketChannel(HBSocketChannel):
136 136
137 # This kernel needs rapid monitoring capabilities
138 time_to_dead = 0.2
137 # This kernel needs quicker monitoring, shorten to 1 sec.
138 # less than 0.5s is unreliable, and will get occasional
139 # false reports of missed beats.
140 time_to_dead = 1.
139 141
140 142 def call_handlers(self, since_last_heartbeat):
141 #io.rprint('[[Heart]]', since_last_heartbeat) # dbg
143 """pause beating on missed heartbeat"""
142 144 pass
143 145
144 146
@@ -471,83 +471,89 b' class HBSocketChannel(ZMQSocketChannel):'
471 471 poller = None
472 472 _running = None
473 473 _pause = None
474 _beating = None
474 475
475 476 def __init__(self, context, session, address):
476 477 super(HBSocketChannel, self).__init__(context, session, address)
477 478 self._running = False
478 self._pause = True
479 self._pause =True
480 self.poller = zmq.Poller()
479 481
480 482 def _create_socket(self):
483 if self.socket is not None:
484 # close previous socket, before opening a new one
485 self.poller.unregister(self.socket)
486 self.socket.close()
481 487 self.socket = self.context.socket(zmq.REQ)
482 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
488 self.socket.setsockopt(zmq.LINGER, 0)
483 489 self.socket.connect('tcp://%s:%i' % self.address)
484 self.poller = zmq.Poller()
490
485 491 self.poller.register(self.socket, zmq.POLLIN)
492
493 def _poll(self, start_time):
494 """poll for heartbeat replies until we reach self.time_to_dead
495
496 Ignores interrupts, and returns the result of poll(), which
497 will be an empty list if no messages arrived before the timeout,
498 or the event tuple if there is a message to receive.
499 """
500
501 until_dead = self.time_to_dead - (time.time() - start_time)
502 # ensure poll at least once
503 until_dead = max(until_dead, 1e-3)
504 events = []
505 while True:
506 try:
507 events = self.poller.poll(1000 * until_dead)
508 except zmq.ZMQError as e:
509 if e.errno == errno.EINTR:
510 # ignore interrupts during heartbeat
511 # this may never actually happen
512 until_dead = self.time_to_dead - (time.time() - start_time)
513 until_dead = max(until_dead, 1e-3)
514 pass
515 else:
516 raise
517 else:
518 break
519 return events
486 520
487 521 def run(self):
488 522 """The thread's main activity. Call start() instead."""
489 523 self._create_socket()
490 524 self._running = True
525 self._beating = True
526
491 527 while self._running:
492 528 if self._pause:
529 # just sleep, and skip the rest of the loop
493 530 time.sleep(self.time_to_dead)
531 continue
532
533 since_last_heartbeat = 0.0
534 # io.rprint('Ping from HB channel') # dbg
535 # no need to catch EFSM here, because the previous event was
536 # either a recv or connect, which cannot be followed by EFSM
537 self.socket.send(b'ping')
538 request_time = time.time()
539 ready = self._poll(request_time)
540 if ready:
541 self._beating = True
542 # the poll above guarantees we have something to recv
543 self.socket.recv()
544 # sleep the remainder of the cycle
545 remainder = self.time_to_dead - (time.time() - request_time)
546 if remainder > 0:
547 time.sleep(remainder)
548 continue
494 549 else:
495 since_last_heartbeat = 0.0
496 request_time = time.time()
497 try:
498 #io.rprint('Ping from HB channel') # dbg
499 self.socket.send(b'ping')
500 except zmq.ZMQError, e:
501 #io.rprint('*** HB Error:', e) # dbg
502 if e.errno == zmq.EFSM:
503 #io.rprint('sleep...', self.time_to_dead) # dbg
504 time.sleep(self.time_to_dead)
505 self._create_socket()
506 else:
507 raise
508 else:
509 while True:
510 try:
511 self.socket.recv(zmq.NOBLOCK)
512 except zmq.ZMQError, e:
513 #io.rprint('*** HB Error 2:', e) # dbg
514 if e.errno == zmq.EAGAIN:
515 before_poll = time.time()
516 until_dead = self.time_to_dead - (before_poll -
517 request_time)
518
519 # When the return value of poll() is an empty
520 # list, that is when things have gone wrong
521 # (zeromq bug). As long as it is not an empty
522 # list, poll is working correctly even if it
523 # returns quickly. Note: poll timeout is in
524 # milliseconds.
525 if until_dead > 0.0:
526 while True:
527 try:
528 self.poller.poll(1000 * until_dead)
529 except zmq.ZMQError as e:
530 if e.errno == errno.EINTR:
531 continue
532 else:
533 raise
534 else:
535 break
536
537 since_last_heartbeat = time.time()-request_time
538 if since_last_heartbeat > self.time_to_dead:
539 self.call_handlers(since_last_heartbeat)
540 break
541 else:
542 # FIXME: We should probably log this instead.
543 raise
544 else:
545 until_dead = self.time_to_dead - (time.time() -
546 request_time)
547 if until_dead > 0.0:
548 #io.rprint('sleep...', self.time_to_dead) # dbg
549 time.sleep(until_dead)
550 break
550 # nothing was received within the time limit, signal heart failure
551 self._beating = False
552 since_last_heartbeat = time.time() - request_time
553 self.call_handlers(since_last_heartbeat)
554 # and close/reopen the socket, because the REQ/REP cycle has been broken
555 self._create_socket()
556 continue
551 557
552 558 def pause(self):
553 559 """Pause the heartbeat."""
@@ -558,8 +564,8 b' class HBSocketChannel(ZMQSocketChannel):'
558 564 self._pause = False
559 565
560 566 def is_beating(self):
561 """Is the heartbeat running and not paused."""
562 if self.is_alive() and not self._pause:
567 """Is the heartbeat running and responsive (and not paused)."""
568 if self.is_alive() and not self._pause and self._beating:
563 569 return True
564 570 else:
565 571 return False
@@ -573,7 +579,7 b' class HBSocketChannel(ZMQSocketChannel):'
573 579
574 580 Subclasses should override this method to handle incoming messages.
575 581 It is important to remember that this method is called in the thread
576 so that some logic must be done to ensure that the application leve
582 so that some logic must be done to ensure that the application level
577 583 handlers are called in the application thread.
578 584 """
579 585 raise NotImplementedError('call_handlers must be defined in a subclass.')
@@ -900,16 +906,18 b' class KernelManager(HasTraits):'
900 906 @property
901 907 def is_alive(self):
902 908 """Is the kernel process still running?"""
903 # FIXME: not using a heartbeat means this method is broken for any
904 # remote kernel, it's only capable of handling local kernels.
905 909 if self.has_kernel:
906 910 if self.kernel.poll() is None:
907 911 return True
908 912 else:
909 913 return False
914 elif self._hb_channel is not None:
915 # We didn't start the kernel with this KernelManager so we
916 # use the heartbeat.
917 return self._hb_channel.is_beating()
910 918 else:
911 # We didn't start the kernel with this KernelManager so we don't
912 # know if it is running. We should use a heartbeat for this case.
919 # no heartbeat and not local, we can't tell if it's running,
920 # so naively return True
913 921 return True
914 922
915 923 #--------------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now