##// END OF EJS Templates
Make heartmonitor.Heart monitorable...
Jan Schulz -
Show More
@@ -19,7 +19,7 b' import time'
19 import uuid
19 import uuid
20
20
21 import zmq
21 import zmq
22 from zmq.devices import ThreadDevice
22 from zmq.devices import ThreadDevice, ThreadMonitoredQueue
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
@@ -39,8 +39,11 b' class Heart(object):'
39 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
39 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
40 device=None
40 device=None
41 id=None
41 id=None
42 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
43 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 if mon_addr is None:
44 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
45 else:
46 self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
44 # do not allow the device to share global Context.instance,
47 # do not allow the device to share global Context.instance,
45 # which is the default behavior in pyzmq > 2.1.10
48 # which is the default behavior in pyzmq > 2.1.10
46 self.device.context_factory = zmq.Context
49 self.device.context_factory = zmq.Context
@@ -48,6 +51,8 b' class Heart(object):'
48 self.device.daemon=True
51 self.device.daemon=True
49 self.device.connect_in(in_addr)
52 self.device.connect_in(in_addr)
50 self.device.connect_out(out_addr)
53 self.device.connect_out(out_addr)
54 if mon_addr is not None:
55 self.device.connect_mon(mon_addr)
51 if in_type == zmq.SUB:
56 if in_type == zmq.SUB:
52 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
57 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
53 if heart_id is None:
58 if heart_id is None:
@@ -122,7 +127,7 b' class HeartMonitor(LoggingConfigurable):'
122 map(self.handle_heart_failure, heartfailures)
127 map(self.handle_heart_failure, heartfailures)
123 self.on_probation = missed_beats.intersection(self.hearts)
128 self.on_probation = missed_beats.intersection(self.hearts)
124 self.responses = set()
129 self.responses = set()
125 # print self.on_probation, self.hearts
130 #print self.on_probation, self.hearts
126 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
131 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
127 self.pingstream.send(str_to_bytes(str(self.lifetime)))
132 self.pingstream.send(str_to_bytes(str(self.lifetime)))
128 # flush stream to force immediate socket send
133 # flush stream to force immediate socket send
@@ -177,6 +182,8 b" if __name__ == '__main__':"
177 outstream = zmqstream.ZMQStream(pub, loop)
182 outstream = zmqstream.ZMQStream(pub, loop)
178 instream = zmqstream.ZMQStream(router, loop)
183 instream = zmqstream.ZMQStream(router, loop)
179
184
180 hb = HeartMonitor(loop, outstream, instream)
185 hb = HeartMonitor(loop=loop, pingstream=outstream, pongstream=instream)
181
186 import logging
187 hb.log.setLevel(logging.DEBUG)
188 hb.start()
182 loop.start()
189 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now