Show More
@@ -1,173 +1,173 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """ |
|
2 | """ | |
3 | A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB, |
|
3 | A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB, | |
4 | and hearts are tracked based on their XREQ identities. |
|
4 | and hearts are tracked based on their XREQ identities. | |
5 |
|
5 | |||
6 | Authors: |
|
6 | Authors: | |
7 |
|
7 | |||
8 | * Min RK |
|
8 | * Min RK | |
9 | """ |
|
9 | """ | |
10 | #----------------------------------------------------------------------------- |
|
10 | #----------------------------------------------------------------------------- | |
11 | # Copyright (C) 2010-2011 The IPython Development Team |
|
11 | # Copyright (C) 2010-2011 The IPython Development Team | |
12 | # |
|
12 | # | |
13 | # Distributed under the terms of the BSD License. The full license is in |
|
13 | # Distributed under the terms of the BSD License. The full license is in | |
14 | # the file COPYING, distributed as part of this software. |
|
14 | # the file COPYING, distributed as part of this software. | |
15 | #----------------------------------------------------------------------------- |
|
15 | #----------------------------------------------------------------------------- | |
16 |
|
16 | |||
17 | from __future__ import print_function |
|
17 | from __future__ import print_function | |
18 | import time |
|
18 | import time | |
19 | import uuid |
|
19 | import uuid | |
20 |
|
20 | |||
21 | import zmq |
|
21 | import zmq | |
22 | from zmq.devices import ThreadDevice |
|
22 | from zmq.devices import ThreadDevice | |
23 | from zmq.eventloop import ioloop, zmqstream |
|
23 | from zmq.eventloop import ioloop, zmqstream | |
24 |
|
24 | |||
25 | from IPython.config.configurable import LoggingConfigurable |
|
25 | from IPython.config.configurable import LoggingConfigurable | |
26 | from IPython.utils.traitlets import Set, Instance, CFloat |
|
26 | from IPython.utils.traitlets import Set, Instance, CFloat | |
27 |
|
27 | |||
28 | from IPython.parallel.util import asbytes |
|
28 | from IPython.parallel.util import asbytes | |
29 |
|
29 | |||
30 | class Heart(object): |
|
30 | class Heart(object): | |
31 | """A basic heart object for responding to a HeartMonitor. |
|
31 | """A basic heart object for responding to a HeartMonitor. | |
32 | This is a simple wrapper with defaults for the most common |
|
32 | This is a simple wrapper with defaults for the most common | |
33 | Device model for responding to heartbeats. |
|
33 | Device model for responding to heartbeats. | |
34 |
|
34 | |||
35 | It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using |
|
35 | It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using | |
36 | SUB/XREQ for in/out. |
|
36 | SUB/XREQ for in/out. | |
37 |
|
37 | |||
38 | You can specify the XREQ's IDENTITY via the optional heart_id argument.""" |
|
38 | You can specify the XREQ's IDENTITY via the optional heart_id argument.""" | |
39 | device=None |
|
39 | device=None | |
40 | id=None |
|
40 | id=None | |
41 | def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None): |
|
41 | def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None): | |
42 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) |
|
42 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) | |
43 | self.device.daemon=True |
|
43 | self.device.daemon=True | |
44 | self.device.connect_in(in_addr) |
|
44 | self.device.connect_in(in_addr) | |
45 | self.device.connect_out(out_addr) |
|
45 | self.device.connect_out(out_addr) | |
46 | if in_type == zmq.SUB: |
|
46 | if in_type == zmq.SUB: | |
47 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") |
|
47 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") | |
48 | if heart_id is None: |
|
48 | if heart_id is None: | |
49 | heart_id = uuid.uuid4().bytes |
|
49 | heart_id = uuid.uuid4().bytes | |
50 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) |
|
50 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) | |
51 | self.id = heart_id |
|
51 | self.id = heart_id | |
52 |
|
52 | |||
53 | def start(self): |
|
53 | def start(self): | |
54 | return self.device.start() |
|
54 | return self.device.start() | |
55 |
|
55 | |||
56 | class HeartMonitor(LoggingConfigurable): |
|
56 | class HeartMonitor(LoggingConfigurable): | |
57 | """A basic HeartMonitor class |
|
57 | """A basic HeartMonitor class | |
58 | pingstream: a PUB stream |
|
58 | pingstream: a PUB stream | |
59 | pongstream: an XREP stream |
|
59 | pongstream: an XREP stream | |
60 | period: the period of the heartbeat in milliseconds""" |
|
60 | period: the period of the heartbeat in milliseconds""" | |
61 |
|
61 | |||
62 | period=CFloat(1000, config=True, |
|
62 | period=CFloat(1000, config=True, | |
63 | help='The frequency at which the Hub pings the engines for heartbeats ' |
|
63 | help='The frequency at which the Hub pings the engines for heartbeats ' | |
64 |
' |
|
64 | '(in ms)', | |
65 | ) |
|
65 | ) | |
66 |
|
66 | |||
67 | pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
67 | pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | |
68 | pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
68 | pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | |
69 | loop = Instance('zmq.eventloop.ioloop.IOLoop') |
|
69 | loop = Instance('zmq.eventloop.ioloop.IOLoop') | |
70 | def _loop_default(self): |
|
70 | def _loop_default(self): | |
71 | return ioloop.IOLoop.instance() |
|
71 | return ioloop.IOLoop.instance() | |
72 |
|
72 | |||
73 | # not settable: |
|
73 | # not settable: | |
74 | hearts=Set() |
|
74 | hearts=Set() | |
75 | responses=Set() |
|
75 | responses=Set() | |
76 | on_probation=Set() |
|
76 | on_probation=Set() | |
77 | last_ping=CFloat(0) |
|
77 | last_ping=CFloat(0) | |
78 | _new_handlers = Set() |
|
78 | _new_handlers = Set() | |
79 | _failure_handlers = Set() |
|
79 | _failure_handlers = Set() | |
80 | lifetime = CFloat(0) |
|
80 | lifetime = CFloat(0) | |
81 | tic = CFloat(0) |
|
81 | tic = CFloat(0) | |
82 |
|
82 | |||
83 | def __init__(self, **kwargs): |
|
83 | def __init__(self, **kwargs): | |
84 | super(HeartMonitor, self).__init__(**kwargs) |
|
84 | super(HeartMonitor, self).__init__(**kwargs) | |
85 |
|
85 | |||
86 | self.pongstream.on_recv(self.handle_pong) |
|
86 | self.pongstream.on_recv(self.handle_pong) | |
87 |
|
87 | |||
88 | def start(self): |
|
88 | def start(self): | |
89 | self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) |
|
89 | self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) | |
90 | self.caller.start() |
|
90 | self.caller.start() | |
91 |
|
91 | |||
92 | def add_new_heart_handler(self, handler): |
|
92 | def add_new_heart_handler(self, handler): | |
93 | """add a new handler for new hearts""" |
|
93 | """add a new handler for new hearts""" | |
94 | self.log.debug("heartbeat::new_heart_handler: %s"%handler) |
|
94 | self.log.debug("heartbeat::new_heart_handler: %s"%handler) | |
95 | self._new_handlers.add(handler) |
|
95 | self._new_handlers.add(handler) | |
96 |
|
96 | |||
97 | def add_heart_failure_handler(self, handler): |
|
97 | def add_heart_failure_handler(self, handler): | |
98 | """add a new handler for heart failure""" |
|
98 | """add a new handler for heart failure""" | |
99 | self.log.debug("heartbeat::new heart failure handler: %s"%handler) |
|
99 | self.log.debug("heartbeat::new heart failure handler: %s"%handler) | |
100 | self._failure_handlers.add(handler) |
|
100 | self._failure_handlers.add(handler) | |
101 |
|
101 | |||
102 | def beat(self): |
|
102 | def beat(self): | |
103 | self.pongstream.flush() |
|
103 | self.pongstream.flush() | |
104 | self.last_ping = self.lifetime |
|
104 | self.last_ping = self.lifetime | |
105 |
|
105 | |||
106 | toc = time.time() |
|
106 | toc = time.time() | |
107 | self.lifetime += toc-self.tic |
|
107 | self.lifetime += toc-self.tic | |
108 | self.tic = toc |
|
108 | self.tic = toc | |
109 | # self.log.debug("heartbeat::%s"%self.lifetime) |
|
109 | # self.log.debug("heartbeat::%s"%self.lifetime) | |
110 | goodhearts = self.hearts.intersection(self.responses) |
|
110 | goodhearts = self.hearts.intersection(self.responses) | |
111 | missed_beats = self.hearts.difference(goodhearts) |
|
111 | missed_beats = self.hearts.difference(goodhearts) | |
112 | heartfailures = self.on_probation.intersection(missed_beats) |
|
112 | heartfailures = self.on_probation.intersection(missed_beats) | |
113 | newhearts = self.responses.difference(goodhearts) |
|
113 | newhearts = self.responses.difference(goodhearts) | |
114 | map(self.handle_new_heart, newhearts) |
|
114 | map(self.handle_new_heart, newhearts) | |
115 | map(self.handle_heart_failure, heartfailures) |
|
115 | map(self.handle_heart_failure, heartfailures) | |
116 | self.on_probation = missed_beats.intersection(self.hearts) |
|
116 | self.on_probation = missed_beats.intersection(self.hearts) | |
117 | self.responses = set() |
|
117 | self.responses = set() | |
118 | # print self.on_probation, self.hearts |
|
118 | # print self.on_probation, self.hearts | |
119 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) |
|
119 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) | |
120 | self.pingstream.send(asbytes(str(self.lifetime))) |
|
120 | self.pingstream.send(asbytes(str(self.lifetime))) | |
121 |
|
121 | |||
122 | def handle_new_heart(self, heart): |
|
122 | def handle_new_heart(self, heart): | |
123 | if self._new_handlers: |
|
123 | if self._new_handlers: | |
124 | for handler in self._new_handlers: |
|
124 | for handler in self._new_handlers: | |
125 | handler(heart) |
|
125 | handler(heart) | |
126 | else: |
|
126 | else: | |
127 | self.log.info("heartbeat::yay, got new heart %s!"%heart) |
|
127 | self.log.info("heartbeat::yay, got new heart %s!"%heart) | |
128 | self.hearts.add(heart) |
|
128 | self.hearts.add(heart) | |
129 |
|
129 | |||
130 | def handle_heart_failure(self, heart): |
|
130 | def handle_heart_failure(self, heart): | |
131 | if self._failure_handlers: |
|
131 | if self._failure_handlers: | |
132 | for handler in self._failure_handlers: |
|
132 | for handler in self._failure_handlers: | |
133 | try: |
|
133 | try: | |
134 | handler(heart) |
|
134 | handler(heart) | |
135 | except Exception as e: |
|
135 | except Exception as e: | |
136 | self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True) |
|
136 | self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True) | |
137 | pass |
|
137 | pass | |
138 | else: |
|
138 | else: | |
139 | self.log.info("heartbeat::Heart %s failed :("%heart) |
|
139 | self.log.info("heartbeat::Heart %s failed :("%heart) | |
140 | self.hearts.remove(heart) |
|
140 | self.hearts.remove(heart) | |
141 |
|
141 | |||
142 |
|
142 | |||
143 | def handle_pong(self, msg): |
|
143 | def handle_pong(self, msg): | |
144 | "a heart just beat" |
|
144 | "a heart just beat" | |
145 | current = asbytes(str(self.lifetime)) |
|
145 | current = asbytes(str(self.lifetime)) | |
146 | last = asbytes(str(self.last_ping)) |
|
146 | last = asbytes(str(self.last_ping)) | |
147 | if msg[1] == current: |
|
147 | if msg[1] == current: | |
148 | delta = time.time()-self.tic |
|
148 | delta = time.time()-self.tic | |
149 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
|
149 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | |
150 | self.responses.add(msg[0]) |
|
150 | self.responses.add(msg[0]) | |
151 | elif msg[1] == last: |
|
151 | elif msg[1] == last: | |
152 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) |
|
152 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) | |
153 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) |
|
153 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) | |
154 | self.responses.add(msg[0]) |
|
154 | self.responses.add(msg[0]) | |
155 | else: |
|
155 | else: | |
156 | self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% |
|
156 | self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% | |
157 | (msg[1],self.lifetime)) |
|
157 | (msg[1],self.lifetime)) | |
158 |
|
158 | |||
159 |
|
159 | |||
160 | if __name__ == '__main__': |
|
160 | if __name__ == '__main__': | |
161 | loop = ioloop.IOLoop.instance() |
|
161 | loop = ioloop.IOLoop.instance() | |
162 | context = zmq.Context() |
|
162 | context = zmq.Context() | |
163 | pub = context.socket(zmq.PUB) |
|
163 | pub = context.socket(zmq.PUB) | |
164 | pub.bind('tcp://127.0.0.1:5555') |
|
164 | pub.bind('tcp://127.0.0.1:5555') | |
165 | xrep = context.socket(zmq.ROUTER) |
|
165 | xrep = context.socket(zmq.ROUTER) | |
166 | xrep.bind('tcp://127.0.0.1:5556') |
|
166 | xrep.bind('tcp://127.0.0.1:5556') | |
167 |
|
167 | |||
168 | outstream = zmqstream.ZMQStream(pub, loop) |
|
168 | outstream = zmqstream.ZMQStream(pub, loop) | |
169 | instream = zmqstream.ZMQStream(xrep, loop) |
|
169 | instream = zmqstream.ZMQStream(xrep, loop) | |
170 |
|
170 | |||
171 | hb = HeartMonitor(loop, outstream, instream) |
|
171 | hb = HeartMonitor(loop, outstream, instream) | |
172 |
|
172 | |||
173 | loop.start() |
|
173 | loop.start() |
General Comments 0
You need to be logged in to leave comments.
Login now