##// END OF EJS Templates
* Fixed heartbeat thread not stopping cleanly....
epatters -
Show More
@@ -448,8 +448,7 b' class RepSocketChannel(ZmqSocketChannel):'
448
448
449
449
450 class HBSocketChannel(ZmqSocketChannel):
450 class HBSocketChannel(ZmqSocketChannel):
451 """The heartbeat channel which monitors the kernel heartbeat.
451 """The heartbeat channel which monitors the kernel heartbeat."""
452 """
453
452
454 time_to_dead = 5.0
453 time_to_dead = 5.0
455 socket = None
454 socket = None
@@ -457,6 +456,7 b' class HBSocketChannel(ZmqSocketChannel):'
457
456
458 def __init__(self, context, session, address):
457 def __init__(self, context, session, address):
459 super(HBSocketChannel, self).__init__(context, session, address)
458 super(HBSocketChannel, self).__init__(context, session, address)
459 self._running = False
460
460
461 def _create_socket(self):
461 def _create_socket(self):
462 self.socket = self.context.socket(zmq.REQ)
462 self.socket = self.context.socket(zmq.REQ)
@@ -469,7 +469,8 b' class HBSocketChannel(ZmqSocketChannel):'
469 """The thread's main activity. Call start() instead."""
469 """The thread's main activity. Call start() instead."""
470 self._create_socket()
470 self._create_socket()
471
471
472 while True:
472 self._running = True
473 while self._running:
473 since_last_heartbeat = 0.0
474 since_last_heartbeat = 0.0
474 request_time = time.time()
475 request_time = time.time()
475 try:
476 try:
@@ -486,21 +487,27 b' class HBSocketChannel(ZmqSocketChannel):'
486 reply = self.socket.recv_json(zmq.NOBLOCK)
487 reply = self.socket.recv_json(zmq.NOBLOCK)
487 except zmq.ZMQError, e:
488 except zmq.ZMQError, e:
488 if e.errno == zmq.EAGAIN:
489 if e.errno == zmq.EAGAIN:
489 until_dead = self.time_to_dead-(time.time()-request_time)
490 until_dead = self.time_to_dead - (time.time() -
491 request_time)
490 self.poller.poll(until_dead)
492 self.poller.poll(until_dead)
491 since_last_heartbeat = time.time() - request_time
493 since_last_heartbeat = time.time() - request_time
492 if since_last_heartbeat > self.time_to_dead:
494 if since_last_heartbeat > self.time_to_dead:
493 self.call_handlers(since_last_heartbeat)
495 self.call_handlers(since_last_heartbeat)
494 break
496 break
495 else:
497 else:
496 # We should probably log this instead
498 # FIXME: We should probably log this instead.
497 raise
499 raise
498 else:
500 else:
499 until_dead = self.time_to_dead-(time.time()-request_time)
501 until_dead = self.time_to_dead - (time.time() -
502 request_time)
500 if until_dead > 0.0:
503 if until_dead > 0.0:
501 time.sleep(until_dead)
504 time.sleep(until_dead)
502 break
505 break
503
506
507 def stop(self):
508 self._running = False
509 super(HBSocketChannel, self).stop()
510
504 def call_handlers(self, since_last_heartbeat):
511 def call_handlers(self, since_last_heartbeat):
505 """This method is called in the ioloop thread when a message arrives.
512 """This method is called in the ioloop thread when a message arrives.
506
513
@@ -608,7 +615,8 b' class KernelManager(HasTraits):'
608 """
615 """
609 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
616 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
610 self.rep_address, self.hb_address
617 self.rep_address, self.hb_address
611 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST or hb[0] != LOCALHOST:
618 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
619 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
612 raise RuntimeError("Can only launch a kernel on localhost."
620 raise RuntimeError("Can only launch a kernel on localhost."
613 "Make sure that the '*_address' attributes are "
621 "Make sure that the '*_address' attributes are "
614 "configured properly.")
622 "configured properly.")
@@ -619,8 +627,8 b' class KernelManager(HasTraits):'
619 else:
627 else:
620 from pykernel import launch_kernel as launch
628 from pykernel import launch_kernel as launch
621 self.kernel, xrep, pub, req, hb = launch(
629 self.kernel, xrep, pub, req, hb = launch(
622 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1],
630 xrep_port=xreq[1], pub_port=sub[1],
623 hb_port=hb[1], **kw)
631 req_port=rep[1], hb_port=hb[1], **kw)
624 self.xreq_address = (LOCALHOST, xrep)
632 self.xreq_address = (LOCALHOST, xrep)
625 self.sub_address = (LOCALHOST, pub)
633 self.sub_address = (LOCALHOST, pub)
626 self.rep_address = (LOCALHOST, req)
634 self.rep_address = (LOCALHOST, req)
@@ -637,7 +645,7 b' class KernelManager(HasTraits):'
637 else:
645 else:
638 if self.has_kernel:
646 if self.has_kernel:
639 self.kill_kernel()
647 self.kill_kernel()
640 self.start_kernel(*self._launch_args)
648 self.start_kernel(**self._launch_args)
641
649
642 @property
650 @property
643 def has_kernel(self):
651 def has_kernel(self):
General Comments 0
You need to be logged in to leave comments. Login now