##// END OF EJS Templates
fix typo in HeartMonitor.period helpstring
MinRK -
Show More
@@ -1,173 +1,173 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their XREQ identities.
5 5
6 6 Authors:
7 7
8 8 * Min RK
9 9 """
10 10 #-----------------------------------------------------------------------------
11 11 # Copyright (C) 2010-2011 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-----------------------------------------------------------------------------
16 16
17 17 from __future__ import print_function
18 18 import time
19 19 import uuid
20 20
21 21 import zmq
22 22 from zmq.devices import ThreadDevice
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.config.configurable import LoggingConfigurable
26 26 from IPython.utils.traitlets import Set, Instance, CFloat
27 27
28 28 from IPython.parallel.util import asbytes
29 29
30 30 class Heart(object):
31 31 """A basic heart object for responding to a HeartMonitor.
32 32 This is a simple wrapper with defaults for the most common
33 33 Device model for responding to heartbeats.
34 34
35 35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 36 SUB/XREQ for in/out.
37 37
38 38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 39 device=None
40 40 id=None
41 41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 43 self.device.daemon=True
44 44 self.device.connect_in(in_addr)
45 45 self.device.connect_out(out_addr)
46 46 if in_type == zmq.SUB:
47 47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
48 48 if heart_id is None:
49 49 heart_id = uuid.uuid4().bytes
50 50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
51 51 self.id = heart_id
52 52
53 53 def start(self):
54 54 return self.device.start()
55 55
56 56 class HeartMonitor(LoggingConfigurable):
57 57 """A basic HeartMonitor class
58 58 pingstream: a PUB stream
59 59 pongstream: an XREP stream
60 60 period: the period of the heartbeat in milliseconds"""
61 61
62 62 period=CFloat(1000, config=True,
63 63 help='The frequency at which the Hub pings the engines for heartbeats '
64 ' (in ms) [default: 100]',
64 '(in ms)',
65 65 )
66 66
67 67 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
68 68 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
69 69 loop = Instance('zmq.eventloop.ioloop.IOLoop')
70 70 def _loop_default(self):
71 71 return ioloop.IOLoop.instance()
72 72
73 73 # not settable:
74 74 hearts=Set()
75 75 responses=Set()
76 76 on_probation=Set()
77 77 last_ping=CFloat(0)
78 78 _new_handlers = Set()
79 79 _failure_handlers = Set()
80 80 lifetime = CFloat(0)
81 81 tic = CFloat(0)
82 82
83 83 def __init__(self, **kwargs):
84 84 super(HeartMonitor, self).__init__(**kwargs)
85 85
86 86 self.pongstream.on_recv(self.handle_pong)
87 87
88 88 def start(self):
89 89 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
90 90 self.caller.start()
91 91
92 92 def add_new_heart_handler(self, handler):
93 93 """add a new handler for new hearts"""
94 94 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
95 95 self._new_handlers.add(handler)
96 96
97 97 def add_heart_failure_handler(self, handler):
98 98 """add a new handler for heart failure"""
99 99 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
100 100 self._failure_handlers.add(handler)
101 101
102 102 def beat(self):
103 103 self.pongstream.flush()
104 104 self.last_ping = self.lifetime
105 105
106 106 toc = time.time()
107 107 self.lifetime += toc-self.tic
108 108 self.tic = toc
109 109 # self.log.debug("heartbeat::%s"%self.lifetime)
110 110 goodhearts = self.hearts.intersection(self.responses)
111 111 missed_beats = self.hearts.difference(goodhearts)
112 112 heartfailures = self.on_probation.intersection(missed_beats)
113 113 newhearts = self.responses.difference(goodhearts)
114 114 map(self.handle_new_heart, newhearts)
115 115 map(self.handle_heart_failure, heartfailures)
116 116 self.on_probation = missed_beats.intersection(self.hearts)
117 117 self.responses = set()
118 118 # print self.on_probation, self.hearts
119 119 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
120 120 self.pingstream.send(asbytes(str(self.lifetime)))
121 121
122 122 def handle_new_heart(self, heart):
123 123 if self._new_handlers:
124 124 for handler in self._new_handlers:
125 125 handler(heart)
126 126 else:
127 127 self.log.info("heartbeat::yay, got new heart %s!"%heart)
128 128 self.hearts.add(heart)
129 129
130 130 def handle_heart_failure(self, heart):
131 131 if self._failure_handlers:
132 132 for handler in self._failure_handlers:
133 133 try:
134 134 handler(heart)
135 135 except Exception as e:
136 136 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
137 137 pass
138 138 else:
139 139 self.log.info("heartbeat::Heart %s failed :("%heart)
140 140 self.hearts.remove(heart)
141 141
142 142
143 143 def handle_pong(self, msg):
144 144 "a heart just beat"
145 145 current = asbytes(str(self.lifetime))
146 146 last = asbytes(str(self.last_ping))
147 147 if msg[1] == current:
148 148 delta = time.time()-self.tic
149 149 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
150 150 self.responses.add(msg[0])
151 151 elif msg[1] == last:
152 152 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
153 153 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
154 154 self.responses.add(msg[0])
155 155 else:
156 156 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
157 157 (msg[1],self.lifetime))
158 158
159 159
160 160 if __name__ == '__main__':
161 161 loop = ioloop.IOLoop.instance()
162 162 context = zmq.Context()
163 163 pub = context.socket(zmq.PUB)
164 164 pub.bind('tcp://127.0.0.1:5555')
165 165 xrep = context.socket(zmq.ROUTER)
166 166 xrep.bind('tcp://127.0.0.1:5556')
167 167
168 168 outstream = zmqstream.ZMQStream(pub, loop)
169 169 instream = zmqstream.ZMQStream(xrep, loop)
170 170
171 171 hb = HeartMonitor(loop, outstream, instream)
172 172
173 173 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now