##// END OF EJS Templates
fix typo in HeartMonitor.period helpstring
MinRK -
Show More
@@ -1,173 +1,173 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010-2011 The IPython Development Team
11 # Copyright (C) 2010-2011 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import print_function
17 from __future__ import print_function
18 import time
18 import time
19 import uuid
19 import uuid
20
20
21 import zmq
21 import zmq
22 from zmq.devices import ThreadDevice
22 from zmq.devices import ThreadDevice
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat
26 from IPython.utils.traitlets import Set, Instance, CFloat
27
27
28 from IPython.parallel.util import asbytes
28 from IPython.parallel.util import asbytes
29
29
30 class Heart(object):
30 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
31 """A basic heart object for responding to a HeartMonitor.
32 This is a simple wrapper with defaults for the most common
32 This is a simple wrapper with defaults for the most common
33 Device model for responding to heartbeats.
33 Device model for responding to heartbeats.
34
34
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 SUB/XREQ for in/out.
36 SUB/XREQ for in/out.
37
37
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 device=None
39 device=None
40 id=None
40 id=None
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 self.device.daemon=True
43 self.device.daemon=True
44 self.device.connect_in(in_addr)
44 self.device.connect_in(in_addr)
45 self.device.connect_out(out_addr)
45 self.device.connect_out(out_addr)
46 if in_type == zmq.SUB:
46 if in_type == zmq.SUB:
47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
48 if heart_id is None:
48 if heart_id is None:
49 heart_id = uuid.uuid4().bytes
49 heart_id = uuid.uuid4().bytes
50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
51 self.id = heart_id
51 self.id = heart_id
52
52
53 def start(self):
53 def start(self):
54 return self.device.start()
54 return self.device.start()
55
55
56 class HeartMonitor(LoggingConfigurable):
56 class HeartMonitor(LoggingConfigurable):
57 """A basic HeartMonitor class
57 """A basic HeartMonitor class
58 pingstream: a PUB stream
58 pingstream: a PUB stream
59 pongstream: an XREP stream
59 pongstream: an XREP stream
60 period: the period of the heartbeat in milliseconds"""
60 period: the period of the heartbeat in milliseconds"""
61
61
62 period=CFloat(1000, config=True,
62 period=CFloat(1000, config=True,
63 help='The frequency at which the Hub pings the engines for heartbeats '
63 help='The frequency at which the Hub pings the engines for heartbeats '
64 ' (in ms) [default: 100]',
64 '(in ms)',
65 )
65 )
66
66
67 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
68 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
68 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
69 loop = Instance('zmq.eventloop.ioloop.IOLoop')
69 loop = Instance('zmq.eventloop.ioloop.IOLoop')
70 def _loop_default(self):
70 def _loop_default(self):
71 return ioloop.IOLoop.instance()
71 return ioloop.IOLoop.instance()
72
72
73 # not settable:
73 # not settable:
74 hearts=Set()
74 hearts=Set()
75 responses=Set()
75 responses=Set()
76 on_probation=Set()
76 on_probation=Set()
77 last_ping=CFloat(0)
77 last_ping=CFloat(0)
78 _new_handlers = Set()
78 _new_handlers = Set()
79 _failure_handlers = Set()
79 _failure_handlers = Set()
80 lifetime = CFloat(0)
80 lifetime = CFloat(0)
81 tic = CFloat(0)
81 tic = CFloat(0)
82
82
83 def __init__(self, **kwargs):
83 def __init__(self, **kwargs):
84 super(HeartMonitor, self).__init__(**kwargs)
84 super(HeartMonitor, self).__init__(**kwargs)
85
85
86 self.pongstream.on_recv(self.handle_pong)
86 self.pongstream.on_recv(self.handle_pong)
87
87
88 def start(self):
88 def start(self):
89 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
89 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
90 self.caller.start()
90 self.caller.start()
91
91
92 def add_new_heart_handler(self, handler):
92 def add_new_heart_handler(self, handler):
93 """add a new handler for new hearts"""
93 """add a new handler for new hearts"""
94 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
94 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
95 self._new_handlers.add(handler)
95 self._new_handlers.add(handler)
96
96
97 def add_heart_failure_handler(self, handler):
97 def add_heart_failure_handler(self, handler):
98 """add a new handler for heart failure"""
98 """add a new handler for heart failure"""
99 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
99 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
100 self._failure_handlers.add(handler)
100 self._failure_handlers.add(handler)
101
101
102 def beat(self):
102 def beat(self):
103 self.pongstream.flush()
103 self.pongstream.flush()
104 self.last_ping = self.lifetime
104 self.last_ping = self.lifetime
105
105
106 toc = time.time()
106 toc = time.time()
107 self.lifetime += toc-self.tic
107 self.lifetime += toc-self.tic
108 self.tic = toc
108 self.tic = toc
109 # self.log.debug("heartbeat::%s"%self.lifetime)
109 # self.log.debug("heartbeat::%s"%self.lifetime)
110 goodhearts = self.hearts.intersection(self.responses)
110 goodhearts = self.hearts.intersection(self.responses)
111 missed_beats = self.hearts.difference(goodhearts)
111 missed_beats = self.hearts.difference(goodhearts)
112 heartfailures = self.on_probation.intersection(missed_beats)
112 heartfailures = self.on_probation.intersection(missed_beats)
113 newhearts = self.responses.difference(goodhearts)
113 newhearts = self.responses.difference(goodhearts)
114 map(self.handle_new_heart, newhearts)
114 map(self.handle_new_heart, newhearts)
115 map(self.handle_heart_failure, heartfailures)
115 map(self.handle_heart_failure, heartfailures)
116 self.on_probation = missed_beats.intersection(self.hearts)
116 self.on_probation = missed_beats.intersection(self.hearts)
117 self.responses = set()
117 self.responses = set()
118 # print self.on_probation, self.hearts
118 # print self.on_probation, self.hearts
119 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
119 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
120 self.pingstream.send(asbytes(str(self.lifetime)))
120 self.pingstream.send(asbytes(str(self.lifetime)))
121
121
122 def handle_new_heart(self, heart):
122 def handle_new_heart(self, heart):
123 if self._new_handlers:
123 if self._new_handlers:
124 for handler in self._new_handlers:
124 for handler in self._new_handlers:
125 handler(heart)
125 handler(heart)
126 else:
126 else:
127 self.log.info("heartbeat::yay, got new heart %s!"%heart)
127 self.log.info("heartbeat::yay, got new heart %s!"%heart)
128 self.hearts.add(heart)
128 self.hearts.add(heart)
129
129
130 def handle_heart_failure(self, heart):
130 def handle_heart_failure(self, heart):
131 if self._failure_handlers:
131 if self._failure_handlers:
132 for handler in self._failure_handlers:
132 for handler in self._failure_handlers:
133 try:
133 try:
134 handler(heart)
134 handler(heart)
135 except Exception as e:
135 except Exception as e:
136 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
136 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
137 pass
137 pass
138 else:
138 else:
139 self.log.info("heartbeat::Heart %s failed :("%heart)
139 self.log.info("heartbeat::Heart %s failed :("%heart)
140 self.hearts.remove(heart)
140 self.hearts.remove(heart)
141
141
142
142
143 def handle_pong(self, msg):
143 def handle_pong(self, msg):
144 "a heart just beat"
144 "a heart just beat"
145 current = asbytes(str(self.lifetime))
145 current = asbytes(str(self.lifetime))
146 last = asbytes(str(self.last_ping))
146 last = asbytes(str(self.last_ping))
147 if msg[1] == current:
147 if msg[1] == current:
148 delta = time.time()-self.tic
148 delta = time.time()-self.tic
149 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
149 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
150 self.responses.add(msg[0])
150 self.responses.add(msg[0])
151 elif msg[1] == last:
151 elif msg[1] == last:
152 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
152 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
153 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
153 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
154 self.responses.add(msg[0])
154 self.responses.add(msg[0])
155 else:
155 else:
156 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
156 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
157 (msg[1],self.lifetime))
157 (msg[1],self.lifetime))
158
158
159
159
160 if __name__ == '__main__':
160 if __name__ == '__main__':
161 loop = ioloop.IOLoop.instance()
161 loop = ioloop.IOLoop.instance()
162 context = zmq.Context()
162 context = zmq.Context()
163 pub = context.socket(zmq.PUB)
163 pub = context.socket(zmq.PUB)
164 pub.bind('tcp://127.0.0.1:5555')
164 pub.bind('tcp://127.0.0.1:5555')
165 xrep = context.socket(zmq.ROUTER)
165 xrep = context.socket(zmq.ROUTER)
166 xrep.bind('tcp://127.0.0.1:5556')
166 xrep.bind('tcp://127.0.0.1:5556')
167
167
168 outstream = zmqstream.ZMQStream(pub, loop)
168 outstream = zmqstream.ZMQStream(pub, loop)
169 instream = zmqstream.ZMQStream(xrep, loop)
169 instream = zmqstream.ZMQStream(xrep, loop)
170
170
171 hb = HeartMonitor(loop, outstream, instream)
171 hb = HeartMonitor(loop, outstream, instream)
172
172
173 loop.start()
173 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now