Show More
@@ -134,11 +134,13 b' class BlockingStdInSocketChannel(StdInSocketChannel):' | |||
|
134 | 134 | |
|
135 | 135 | class BlockingHBSocketChannel(HBSocketChannel): |
|
136 | 136 | |
|
137 |
# This kernel needs r |
|
|
138 | time_to_dead = 0.2 | |
|
137 | # This kernel needs quicker monitoring, shorten to 1 sec. | |
|
138 | # less than 0.5s is unreliable, and will get occasional | |
|
139 | # false reports of missed beats. | |
|
140 | time_to_dead = 1. | |
|
139 | 141 | |
|
140 | 142 | def call_handlers(self, since_last_heartbeat): |
|
141 | #io.rprint('[[Heart]]', since_last_heartbeat) # dbg | |
|
143 | """pause beating on missed heartbeat""" | |
|
142 | 144 | pass |
|
143 | 145 | |
|
144 | 146 |
@@ -471,83 +471,89 b' class HBSocketChannel(ZMQSocketChannel):' | |||
|
471 | 471 | poller = None |
|
472 | 472 | _running = None |
|
473 | 473 | _pause = None |
|
474 | _beating = None | |
|
474 | 475 | |
|
475 | 476 | def __init__(self, context, session, address): |
|
476 | 477 | super(HBSocketChannel, self).__init__(context, session, address) |
|
477 | 478 | self._running = False |
|
478 |
self._pause = |
|
|
479 | self._pause =True | |
|
480 | self.poller = zmq.Poller() | |
|
479 | 481 | |
|
480 | 482 | def _create_socket(self): |
|
483 | if self.socket is not None: | |
|
484 | # close previous socket, before opening a new one | |
|
485 | self.poller.unregister(self.socket) | |
|
486 | self.socket.close() | |
|
481 | 487 | self.socket = self.context.socket(zmq.REQ) |
|
482 |
self.socket.setsockopt(zmq. |
|
|
488 | self.socket.setsockopt(zmq.LINGER, 0) | |
|
483 | 489 | self.socket.connect('tcp://%s:%i' % self.address) |
|
484 | self.poller = zmq.Poller() | |
|
490 | ||
|
485 | 491 | self.poller.register(self.socket, zmq.POLLIN) |
|
492 | ||
|
493 | def _poll(self, start_time): | |
|
494 | """poll for heartbeat replies until we reach self.time_to_dead | |
|
495 | ||
|
496 | Ignores interrupts, and returns the result of poll(), which | |
|
497 | will be an empty list if no messages arrived before the timeout, | |
|
498 | or the event tuple if there is a message to receive. | |
|
499 | """ | |
|
500 | ||
|
501 | until_dead = self.time_to_dead - (time.time() - start_time) | |
|
502 | # ensure poll at least once | |
|
503 | until_dead = max(until_dead, 1e-3) | |
|
504 | events = [] | |
|
505 | while True: | |
|
506 | try: | |
|
507 | events = self.poller.poll(1000 * until_dead) | |
|
508 | except zmq.ZMQError as e: | |
|
509 | if e.errno == errno.EINTR: | |
|
510 | # ignore interrupts during heartbeat | |
|
511 | # this may never actually happen | |
|
512 | until_dead = self.time_to_dead - (time.time() - start_time) | |
|
513 | until_dead = max(until_dead, 1e-3) | |
|
514 | pass | |
|
515 | else: | |
|
516 | raise | |
|
517 | else: | |
|
518 | break | |
|
519 | return events | |
|
486 | 520 | |
|
487 | 521 | def run(self): |
|
488 | 522 | """The thread's main activity. Call start() instead.""" |
|
489 | 523 | self._create_socket() |
|
490 | 524 | self._running = True |
|
525 | self._beating = True | |
|
526 | ||
|
491 | 527 | while self._running: |
|
492 | 528 | if self._pause: |
|
529 | # just sleep, and skip the rest of the loop | |
|
493 | 530 | time.sleep(self.time_to_dead) |
|
531 | continue | |
|
532 | ||
|
533 | since_last_heartbeat = 0.0 | |
|
534 | # io.rprint('Ping from HB channel') # dbg | |
|
535 | # no need to catch EFSM here, because the previous event was | |
|
536 | # either a recv or connect, which cannot be followed by EFSM | |
|
537 | self.socket.send(b'ping') | |
|
538 | request_time = time.time() | |
|
539 | ready = self._poll(request_time) | |
|
540 | if ready: | |
|
541 | self._beating = True | |
|
542 | # the poll above guarantees we have something to recv | |
|
543 | self.socket.recv() | |
|
544 | # sleep the remainder of the cycle | |
|
545 | remainder = self.time_to_dead - (time.time() - request_time) | |
|
546 | if remainder > 0: | |
|
547 | time.sleep(remainder) | |
|
548 | continue | |
|
494 | 549 | else: |
|
495 | since_last_heartbeat = 0.0 | |
|
496 |
|
|
|
497 | try: | |
|
498 | #io.rprint('Ping from HB channel') # dbg | |
|
499 | self.socket.send(b'ping') | |
|
500 | except zmq.ZMQError, e: | |
|
501 | #io.rprint('*** HB Error:', e) # dbg | |
|
502 | if e.errno == zmq.EFSM: | |
|
503 | #io.rprint('sleep...', self.time_to_dead) # dbg | |
|
504 | time.sleep(self.time_to_dead) | |
|
505 | self._create_socket() | |
|
506 | else: | |
|
507 | raise | |
|
508 | else: | |
|
509 | while True: | |
|
510 | try: | |
|
511 | self.socket.recv(zmq.NOBLOCK) | |
|
512 | except zmq.ZMQError, e: | |
|
513 | #io.rprint('*** HB Error 2:', e) # dbg | |
|
514 | if e.errno == zmq.EAGAIN: | |
|
515 | before_poll = time.time() | |
|
516 | until_dead = self.time_to_dead - (before_poll - | |
|
517 | request_time) | |
|
518 | ||
|
519 | # When the return value of poll() is an empty | |
|
520 | # list, that is when things have gone wrong | |
|
521 | # (zeromq bug). As long as it is not an empty | |
|
522 | # list, poll is working correctly even if it | |
|
523 | # returns quickly. Note: poll timeout is in | |
|
524 | # milliseconds. | |
|
525 | if until_dead > 0.0: | |
|
526 | while True: | |
|
527 | try: | |
|
528 | self.poller.poll(1000 * until_dead) | |
|
529 | except zmq.ZMQError as e: | |
|
530 | if e.errno == errno.EINTR: | |
|
531 | continue | |
|
532 | else: | |
|
533 | raise | |
|
534 | else: | |
|
535 | break | |
|
536 | ||
|
537 | since_last_heartbeat = time.time()-request_time | |
|
538 | if since_last_heartbeat > self.time_to_dead: | |
|
539 | self.call_handlers(since_last_heartbeat) | |
|
540 | break | |
|
541 | else: | |
|
542 | # FIXME: We should probably log this instead. | |
|
543 | raise | |
|
544 | else: | |
|
545 | until_dead = self.time_to_dead - (time.time() - | |
|
546 | request_time) | |
|
547 | if until_dead > 0.0: | |
|
548 | #io.rprint('sleep...', self.time_to_dead) # dbg | |
|
549 | time.sleep(until_dead) | |
|
550 | break | |
|
550 | # nothing was received within the time limit, signal heart failure | |
|
551 | self._beating = False | |
|
552 | since_last_heartbeat = time.time() - request_time | |
|
553 | self.call_handlers(since_last_heartbeat) | |
|
554 | # and close/reopen the socket, because the REQ/REP cycle has been broken | |
|
555 | self._create_socket() | |
|
556 | continue | |
|
551 | 557 | |
|
552 | 558 | def pause(self): |
|
553 | 559 | """Pause the heartbeat.""" |
@@ -558,8 +564,8 b' class HBSocketChannel(ZMQSocketChannel):' | |||
|
558 | 564 | self._pause = False |
|
559 | 565 | |
|
560 | 566 | def is_beating(self): |
|
561 | """Is the heartbeat running and not paused.""" | |
|
562 | if self.is_alive() and not self._pause: | |
|
567 | """Is the heartbeat running and responsive (and not paused).""" | |
|
568 | if self.is_alive() and not self._pause and self._beating: | |
|
563 | 569 | return True |
|
564 | 570 | else: |
|
565 | 571 | return False |
@@ -573,7 +579,7 b' class HBSocketChannel(ZMQSocketChannel):' | |||
|
573 | 579 | |
|
574 | 580 | Subclasses should override this method to handle incoming messages. |
|
575 | 581 | It is important to remember that this method is called in the thread |
|
576 | so that some logic must be done to ensure that the application leve | |
|
582 | so that some logic must be done to ensure that the application level | |
|
577 | 583 | handlers are called in the application thread. |
|
578 | 584 | """ |
|
579 | 585 | raise NotImplementedError('call_handlers must be defined in a subclass.') |
@@ -900,16 +906,18 b' class KernelManager(HasTraits):' | |||
|
900 | 906 | @property |
|
901 | 907 | def is_alive(self): |
|
902 | 908 | """Is the kernel process still running?""" |
|
903 | # FIXME: not using a heartbeat means this method is broken for any | |
|
904 | # remote kernel, it's only capable of handling local kernels. | |
|
905 | 909 | if self.has_kernel: |
|
906 | 910 | if self.kernel.poll() is None: |
|
907 | 911 | return True |
|
908 | 912 | else: |
|
909 | 913 | return False |
|
914 | elif self._hb_channel is not None: | |
|
915 | # We didn't start the kernel with this KernelManager so we | |
|
916 | # use the heartbeat. | |
|
917 | return self._hb_channel.is_beating() | |
|
910 | 918 | else: |
|
911 | # We didn't start the kernel with this KernelManager so we don't | |
|
912 | # know if it is running. We should use a heartbeat for this case. | |
|
919 | # no heartbeat and not local, we can't tell if it's running, | |
|
920 | # so naively return True | |
|
913 | 921 | return True |
|
914 | 922 | |
|
915 | 923 | #-------------------------------------------------------------------------- |
General Comments 0
You need to be logged in to leave comments.
Login now