##// END OF EJS Templates
Fixes to the heartbeat channel...
MinRK -
Show More
@@ -134,11 +134,13 b' class BlockingStdInSocketChannel(StdInSocketChannel):'
134
134
135 class BlockingHBSocketChannel(HBSocketChannel):
135 class BlockingHBSocketChannel(HBSocketChannel):
136
136
137 # This kernel needs rapid monitoring capabilities
137 # This kernel needs quicker monitoring, shorten to 1 sec.
138 time_to_dead = 0.2
138 # less than 0.5s is unreliable, and will get occasional
139 # false reports of missed beats.
140 time_to_dead = 1.
139
141
140 def call_handlers(self, since_last_heartbeat):
142 def call_handlers(self, since_last_heartbeat):
141 #io.rprint('[[Heart]]', since_last_heartbeat) # dbg
143 """pause beating on missed heartbeat"""
142 pass
144 pass
143
145
144
146
@@ -471,83 +471,89 b' class HBSocketChannel(ZMQSocketChannel):'
471 poller = None
471 poller = None
472 _running = None
472 _running = None
473 _pause = None
473 _pause = None
474 _beating = None
474
475
475 def __init__(self, context, session, address):
476 def __init__(self, context, session, address):
476 super(HBSocketChannel, self).__init__(context, session, address)
477 super(HBSocketChannel, self).__init__(context, session, address)
477 self._running = False
478 self._running = False
478 self._pause = True
479 self._pause =True
480 self.poller = zmq.Poller()
479
481
480 def _create_socket(self):
482 def _create_socket(self):
483 if self.socket is not None:
484 # close previous socket, before opening a new one
485 self.poller.unregister(self.socket)
486 self.socket.close()
481 self.socket = self.context.socket(zmq.REQ)
487 self.socket = self.context.socket(zmq.REQ)
482 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
488 self.socket.setsockopt(zmq.LINGER, 0)
483 self.socket.connect('tcp://%s:%i' % self.address)
489 self.socket.connect('tcp://%s:%i' % self.address)
484 self.poller = zmq.Poller()
490
485 self.poller.register(self.socket, zmq.POLLIN)
491 self.poller.register(self.socket, zmq.POLLIN)
486
492
493 def _poll(self, start_time):
494 """poll for heartbeat replies until we reach self.time_to_dead
495
496 Ignores interrupts, and returns the result of poll(), which
497 will be an empty list if no messages arrived before the timeout,
498 or the event tuple if there is a message to receive.
499 """
500
501 until_dead = self.time_to_dead - (time.time() - start_time)
502 # ensure poll at least once
503 until_dead = max(until_dead, 1e-3)
504 events = []
505 while True:
506 try:
507 events = self.poller.poll(1000 * until_dead)
508 except zmq.ZMQError as e:
509 if e.errno == errno.EINTR:
510 # ignore interrupts during heartbeat
511 # this may never actually happen
512 until_dead = self.time_to_dead - (time.time() - start_time)
513 until_dead = max(until_dead, 1e-3)
514 pass
515 else:
516 raise
517 else:
518 break
519 return events
520
487 def run(self):
521 def run(self):
488 """The thread's main activity. Call start() instead."""
522 """The thread's main activity. Call start() instead."""
489 self._create_socket()
523 self._create_socket()
490 self._running = True
524 self._running = True
525 self._beating = True
526
491 while self._running:
527 while self._running:
492 if self._pause:
528 if self._pause:
529 # just sleep, and skip the rest of the loop
493 time.sleep(self.time_to_dead)
530 time.sleep(self.time_to_dead)
494 else:
531 continue
532
495 since_last_heartbeat = 0.0
533 since_last_heartbeat = 0.0
496 request_time = time.time()
497 try:
498 #io.rprint('Ping from HB channel') # dbg
534 # io.rprint('Ping from HB channel') # dbg
535 # no need to catch EFSM here, because the previous event was
536 # either a recv or connect, which cannot be followed by EFSM
499 self.socket.send(b'ping')
537 self.socket.send(b'ping')
500 except zmq.ZMQError, e:
538 request_time = time.time()
501 #io.rprint('*** HB Error:', e) # dbg
539 ready = self._poll(request_time)
502 if e.errno == zmq.EFSM:
540 if ready:
503 #io.rprint('sleep...', self.time_to_dead) # dbg
541 self._beating = True
504 time.sleep(self.time_to_dead)
542 # the poll above guarantees we have something to recv
505 self._create_socket()
543 self.socket.recv()
506 else:
544 # sleep the remainder of the cycle
507 raise
545 remainder = self.time_to_dead - (time.time() - request_time)
508 else:
546 if remainder > 0:
509 while True:
547 time.sleep(remainder)
510 try:
511 self.socket.recv(zmq.NOBLOCK)
512 except zmq.ZMQError, e:
513 #io.rprint('*** HB Error 2:', e) # dbg
514 if e.errno == zmq.EAGAIN:
515 before_poll = time.time()
516 until_dead = self.time_to_dead - (before_poll -
517 request_time)
518
519 # When the return value of poll() is an empty
520 # list, that is when things have gone wrong
521 # (zeromq bug). As long as it is not an empty
522 # list, poll is working correctly even if it
523 # returns quickly. Note: poll timeout is in
524 # milliseconds.
525 if until_dead > 0.0:
526 while True:
527 try:
528 self.poller.poll(1000 * until_dead)
529 except zmq.ZMQError as e:
530 if e.errno == errno.EINTR:
531 continue
548 continue
532 else:
549 else:
533 raise
550 # nothing was received within the time limit, signal heart failure
534 else:
551 self._beating = False
535 break
536
537 since_last_heartbeat = time.time()-request_time
552 since_last_heartbeat = time.time() - request_time
538 if since_last_heartbeat > self.time_to_dead:
539 self.call_handlers(since_last_heartbeat)
553 self.call_handlers(since_last_heartbeat)
540 break
554 # and close/reopen the socket, because the REQ/REP cycle has been broken
541 else:
555 self._create_socket()
542 # FIXME: We should probably log this instead.
556 continue
543 raise
544 else:
545 until_dead = self.time_to_dead - (time.time() -
546 request_time)
547 if until_dead > 0.0:
548 #io.rprint('sleep...', self.time_to_dead) # dbg
549 time.sleep(until_dead)
550 break
551
557
552 def pause(self):
558 def pause(self):
553 """Pause the heartbeat."""
559 """Pause the heartbeat."""
@@ -558,8 +564,8 b' class HBSocketChannel(ZMQSocketChannel):'
558 self._pause = False
564 self._pause = False
559
565
560 def is_beating(self):
566 def is_beating(self):
561 """Is the heartbeat running and not paused."""
567 """Is the heartbeat running and responsive (and not paused)."""
562 if self.is_alive() and not self._pause:
568 if self.is_alive() and not self._pause and self._beating:
563 return True
569 return True
564 else:
570 else:
565 return False
571 return False
@@ -573,7 +579,7 b' class HBSocketChannel(ZMQSocketChannel):'
573
579
574 Subclasses should override this method to handle incoming messages.
580 Subclasses should override this method to handle incoming messages.
575 It is important to remember that this method is called in the thread
581 It is important to remember that this method is called in the thread
576 so that some logic must be done to ensure that the application leve
582 so that some logic must be done to ensure that the application level
577 handlers are called in the application thread.
583 handlers are called in the application thread.
578 """
584 """
579 raise NotImplementedError('call_handlers must be defined in a subclass.')
585 raise NotImplementedError('call_handlers must be defined in a subclass.')
@@ -900,16 +906,18 b' class KernelManager(HasTraits):'
900 @property
906 @property
901 def is_alive(self):
907 def is_alive(self):
902 """Is the kernel process still running?"""
908 """Is the kernel process still running?"""
903 # FIXME: not using a heartbeat means this method is broken for any
904 # remote kernel, it's only capable of handling local kernels.
905 if self.has_kernel:
909 if self.has_kernel:
906 if self.kernel.poll() is None:
910 if self.kernel.poll() is None:
907 return True
911 return True
908 else:
912 else:
909 return False
913 return False
914 elif self._hb_channel is not None:
915 # We didn't start the kernel with this KernelManager so we
916 # use the heartbeat.
917 return self._hb_channel.is_beating()
910 else:
918 else:
911 # We didn't start the kernel with this KernelManager so we don't
919 # no heartbeat and not local, we can't tell if it's running,
912 # know if it is running. We should use a heartbeat for this case.
920 # so naively return True
913 return True
921 return True
914
922
915 #--------------------------------------------------------------------------
923 #--------------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now