##// END OF EJS Templates
HeartMonitor.period should be an Integer...
MinRK -
Show More
@@ -1,178 +1,178 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010-2011 The IPython Development Team
11 # Copyright (C) 2010-2011 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import print_function
17 from __future__ import print_function
18 import time
18 import time
19 import uuid
19 import uuid
20
20
21 import zmq
21 import zmq
22 from zmq.devices import ThreadDevice
22 from zmq.devices import ThreadDevice
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27
27
28 from IPython.parallel.util import asbytes
28 from IPython.parallel.util import asbytes
29
29
30 class Heart(object):
30 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
31 """A basic heart object for responding to a HeartMonitor.
32 This is a simple wrapper with defaults for the most common
32 This is a simple wrapper with defaults for the most common
33 Device model for responding to heartbeats.
33 Device model for responding to heartbeats.
34
34
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 SUB/XREQ for in/out.
36 SUB/XREQ for in/out.
37
37
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 device=None
39 device=None
40 id=None
40 id=None
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 # do not allow the device to share global Context.instance,
43 # do not allow the device to share global Context.instance,
44 # which is the default behavior in pyzmq > 2.1.10
44 # which is the default behavior in pyzmq > 2.1.10
45 self.device.context_factory = zmq.Context
45 self.device.context_factory = zmq.Context
46
46
47 self.device.daemon=True
47 self.device.daemon=True
48 self.device.connect_in(in_addr)
48 self.device.connect_in(in_addr)
49 self.device.connect_out(out_addr)
49 self.device.connect_out(out_addr)
50 if in_type == zmq.SUB:
50 if in_type == zmq.SUB:
51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
52 if heart_id is None:
52 if heart_id is None:
53 heart_id = uuid.uuid4().bytes
53 heart_id = uuid.uuid4().bytes
54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
55 self.id = heart_id
55 self.id = heart_id
56
56
57 def start(self):
57 def start(self):
58 return self.device.start()
58 return self.device.start()
59
59
60
60
61 class HeartMonitor(LoggingConfigurable):
61 class HeartMonitor(LoggingConfigurable):
62 """A basic HeartMonitor class
62 """A basic HeartMonitor class
63 pingstream: a PUB stream
63 pingstream: a PUB stream
64 pongstream: an XREP stream
64 pongstream: an XREP stream
65 period: the period of the heartbeat in milliseconds"""
65 period: the period of the heartbeat in milliseconds"""
66
66
67 period=CFloat(1000, config=True,
67 period = Integer(1000, config=True,
68 help='The frequency at which the Hub pings the engines for heartbeats '
68 help='The frequency at which the Hub pings the engines for heartbeats '
69 '(in ms)',
69 '(in ms)',
70 )
70 )
71
71
72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
75 def _loop_default(self):
75 def _loop_default(self):
76 return ioloop.IOLoop.instance()
76 return ioloop.IOLoop.instance()
77
77
78 # not settable:
78 # not settable:
79 hearts=Set()
79 hearts=Set()
80 responses=Set()
80 responses=Set()
81 on_probation=Set()
81 on_probation=Set()
82 last_ping=CFloat(0)
82 last_ping=CFloat(0)
83 _new_handlers = Set()
83 _new_handlers = Set()
84 _failure_handlers = Set()
84 _failure_handlers = Set()
85 lifetime = CFloat(0)
85 lifetime = CFloat(0)
86 tic = CFloat(0)
86 tic = CFloat(0)
87
87
88 def __init__(self, **kwargs):
88 def __init__(self, **kwargs):
89 super(HeartMonitor, self).__init__(**kwargs)
89 super(HeartMonitor, self).__init__(**kwargs)
90
90
91 self.pongstream.on_recv(self.handle_pong)
91 self.pongstream.on_recv(self.handle_pong)
92
92
93 def start(self):
93 def start(self):
94 self.tic = time.time()
94 self.tic = time.time()
95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
96 self.caller.start()
96 self.caller.start()
97
97
98 def add_new_heart_handler(self, handler):
98 def add_new_heart_handler(self, handler):
99 """add a new handler for new hearts"""
99 """add a new handler for new hearts"""
100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
101 self._new_handlers.add(handler)
101 self._new_handlers.add(handler)
102
102
103 def add_heart_failure_handler(self, handler):
103 def add_heart_failure_handler(self, handler):
104 """add a new handler for heart failure"""
104 """add a new handler for heart failure"""
105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
106 self._failure_handlers.add(handler)
106 self._failure_handlers.add(handler)
107
107
108 def beat(self):
108 def beat(self):
109 self.pongstream.flush()
109 self.pongstream.flush()
110 self.last_ping = self.lifetime
110 self.last_ping = self.lifetime
111
111
112 toc = time.time()
112 toc = time.time()
113 self.lifetime += toc-self.tic
113 self.lifetime += toc-self.tic
114 self.tic = toc
114 self.tic = toc
115 self.log.debug("heartbeat::sending %s", self.lifetime)
115 self.log.debug("heartbeat::sending %s", self.lifetime)
116 goodhearts = self.hearts.intersection(self.responses)
116 goodhearts = self.hearts.intersection(self.responses)
117 missed_beats = self.hearts.difference(goodhearts)
117 missed_beats = self.hearts.difference(goodhearts)
118 heartfailures = self.on_probation.intersection(missed_beats)
118 heartfailures = self.on_probation.intersection(missed_beats)
119 newhearts = self.responses.difference(goodhearts)
119 newhearts = self.responses.difference(goodhearts)
120 map(self.handle_new_heart, newhearts)
120 map(self.handle_new_heart, newhearts)
121 map(self.handle_heart_failure, heartfailures)
121 map(self.handle_heart_failure, heartfailures)
122 self.on_probation = missed_beats.intersection(self.hearts)
122 self.on_probation = missed_beats.intersection(self.hearts)
123 self.responses = set()
123 self.responses = set()
124 # print self.on_probation, self.hearts
124 # print self.on_probation, self.hearts
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 self.pingstream.send(asbytes(str(self.lifetime)))
126 self.pingstream.send(asbytes(str(self.lifetime)))
127
127
128 def handle_new_heart(self, heart):
128 def handle_new_heart(self, heart):
129 if self._new_handlers:
129 if self._new_handlers:
130 for handler in self._new_handlers:
130 for handler in self._new_handlers:
131 handler(heart)
131 handler(heart)
132 else:
132 else:
133 self.log.info("heartbeat::yay, got new heart %s!", heart)
133 self.log.info("heartbeat::yay, got new heart %s!", heart)
134 self.hearts.add(heart)
134 self.hearts.add(heart)
135
135
136 def handle_heart_failure(self, heart):
136 def handle_heart_failure(self, heart):
137 if self._failure_handlers:
137 if self._failure_handlers:
138 for handler in self._failure_handlers:
138 for handler in self._failure_handlers:
139 try:
139 try:
140 handler(heart)
140 handler(heart)
141 except Exception as e:
141 except Exception as e:
142 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
142 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
143 pass
143 pass
144 else:
144 else:
145 self.log.info("heartbeat::Heart %s failed :(", heart)
145 self.log.info("heartbeat::Heart %s failed :(", heart)
146 self.hearts.remove(heart)
146 self.hearts.remove(heart)
147
147
148
148
149 def handle_pong(self, msg):
149 def handle_pong(self, msg):
150 "a heart just beat"
150 "a heart just beat"
151 current = asbytes(str(self.lifetime))
151 current = asbytes(str(self.lifetime))
152 last = asbytes(str(self.last_ping))
152 last = asbytes(str(self.last_ping))
153 if msg[1] == current:
153 if msg[1] == current:
154 delta = time.time()-self.tic
154 delta = time.time()-self.tic
155 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
155 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
156 self.responses.add(msg[0])
156 self.responses.add(msg[0])
157 elif msg[1] == last:
157 elif msg[1] == last:
158 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
158 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
159 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
159 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
160 self.responses.add(msg[0])
160 self.responses.add(msg[0])
161 else:
161 else:
162 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
162 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
163
163
164
164
165 if __name__ == '__main__':
165 if __name__ == '__main__':
166 loop = ioloop.IOLoop.instance()
166 loop = ioloop.IOLoop.instance()
167 context = zmq.Context()
167 context = zmq.Context()
168 pub = context.socket(zmq.PUB)
168 pub = context.socket(zmq.PUB)
169 pub.bind('tcp://127.0.0.1:5555')
169 pub.bind('tcp://127.0.0.1:5555')
170 xrep = context.socket(zmq.ROUTER)
170 xrep = context.socket(zmq.ROUTER)
171 xrep.bind('tcp://127.0.0.1:5556')
171 xrep.bind('tcp://127.0.0.1:5556')
172
172
173 outstream = zmqstream.ZMQStream(pub, loop)
173 outstream = zmqstream.ZMQStream(pub, loop)
174 instream = zmqstream.ZMQStream(xrep, loop)
174 instream = zmqstream.ZMQStream(xrep, loop)
175
175
176 hb = HeartMonitor(loop, outstream, instream)
176 hb = HeartMonitor(loop, outstream, instream)
177
177
178 loop.start()
178 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now