Show More
@@ -19,7 +19,7 b' import time' | |||||
19 | import uuid |
|
19 | import uuid | |
20 |
|
20 | |||
21 | import zmq |
|
21 | import zmq | |
22 | from zmq.devices import ThreadDevice |
|
22 | from zmq.devices import ThreadDevice, ThreadMonitoredQueue | |
23 | from zmq.eventloop import ioloop, zmqstream |
|
23 | from zmq.eventloop import ioloop, zmqstream | |
24 |
|
24 | |||
25 | from IPython.config.configurable import LoggingConfigurable |
|
25 | from IPython.config.configurable import LoggingConfigurable | |
@@ -39,8 +39,11 b' class Heart(object):' | |||||
39 | You can specify the DEALER's IDENTITY via the optional heart_id argument.""" |
|
39 | You can specify the DEALER's IDENTITY via the optional heart_id argument.""" | |
40 | device=None |
|
40 | device=None | |
41 | id=None |
|
41 | id=None | |
42 | def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None): |
|
42 | def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None): | |
43 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) |
|
43 | if mon_addr is None: | |
|
44 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) | |||
|
45 | else: | |||
|
46 | self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"") | |||
44 | # do not allow the device to share global Context.instance, |
|
47 | # do not allow the device to share global Context.instance, | |
45 | # which is the default behavior in pyzmq > 2.1.10 |
|
48 | # which is the default behavior in pyzmq > 2.1.10 | |
46 | self.device.context_factory = zmq.Context |
|
49 | self.device.context_factory = zmq.Context | |
@@ -48,6 +51,8 b' class Heart(object):' | |||||
48 | self.device.daemon=True |
|
51 | self.device.daemon=True | |
49 | self.device.connect_in(in_addr) |
|
52 | self.device.connect_in(in_addr) | |
50 | self.device.connect_out(out_addr) |
|
53 | self.device.connect_out(out_addr) | |
|
54 | if mon_addr is not None: | |||
|
55 | self.device.connect_mon(mon_addr) | |||
51 | if in_type == zmq.SUB: |
|
56 | if in_type == zmq.SUB: | |
52 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") |
|
57 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") | |
53 | if heart_id is None: |
|
58 | if heart_id is None: | |
@@ -122,7 +127,7 b' class HeartMonitor(LoggingConfigurable):' | |||||
122 | map(self.handle_heart_failure, heartfailures) |
|
127 | map(self.handle_heart_failure, heartfailures) | |
123 | self.on_probation = missed_beats.intersection(self.hearts) |
|
128 | self.on_probation = missed_beats.intersection(self.hearts) | |
124 | self.responses = set() |
|
129 | self.responses = set() | |
125 |
# |
|
130 | #print self.on_probation, self.hearts | |
126 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) |
|
131 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) | |
127 | self.pingstream.send(str_to_bytes(str(self.lifetime))) |
|
132 | self.pingstream.send(str_to_bytes(str(self.lifetime))) | |
128 | # flush stream to force immediate socket send |
|
133 | # flush stream to force immediate socket send | |
@@ -177,6 +182,8 b" if __name__ == '__main__':" | |||||
177 | outstream = zmqstream.ZMQStream(pub, loop) |
|
182 | outstream = zmqstream.ZMQStream(pub, loop) | |
178 | instream = zmqstream.ZMQStream(router, loop) |
|
183 | instream = zmqstream.ZMQStream(router, loop) | |
179 |
|
184 | |||
180 | hb = HeartMonitor(loop, outstream, instream) |
|
185 | hb = HeartMonitor(loop=loop, pingstream=outstream, pongstream=instream) | |
181 |
|
186 | import logging | ||
|
187 | hb.log.setLevel(logging.DEBUG) | |||
|
188 | hb.start() | |||
182 | loop.start() |
|
189 | loop.start() |
General Comments 0
You need to be logged in to leave comments.
Login now