##// END OF EJS Templates
HeartMonitor.period should be an Integer...
MinRK -
Show More
@@ -1,178 +1,178 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their XREQ identities.
5 5
6 6 Authors:
7 7
8 8 * Min RK
9 9 """
10 10 #-----------------------------------------------------------------------------
11 11 # Copyright (C) 2010-2011 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-----------------------------------------------------------------------------
16 16
17 17 from __future__ import print_function
18 18 import time
19 19 import uuid
20 20
21 21 import zmq
22 22 from zmq.devices import ThreadDevice
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27 27
28 28 from IPython.parallel.util import asbytes
29 29
30 30 class Heart(object):
31 31 """A basic heart object for responding to a HeartMonitor.
32 32 This is a simple wrapper with defaults for the most common
33 33 Device model for responding to heartbeats.
34 34
35 35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 36 SUB/XREQ for in/out.
37 37
38 38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 39 device=None
40 40 id=None
41 41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 43 # do not allow the device to share global Context.instance,
44 44 # which is the default behavior in pyzmq > 2.1.10
45 45 self.device.context_factory = zmq.Context
46 46
47 47 self.device.daemon=True
48 48 self.device.connect_in(in_addr)
49 49 self.device.connect_out(out_addr)
50 50 if in_type == zmq.SUB:
51 51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
52 52 if heart_id is None:
53 53 heart_id = uuid.uuid4().bytes
54 54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
55 55 self.id = heart_id
56 56
57 57 def start(self):
58 58 return self.device.start()
59 59
60 60
61 61 class HeartMonitor(LoggingConfigurable):
62 62 """A basic HeartMonitor class
63 63 pingstream: a PUB stream
64 64 pongstream: an XREP stream
65 65 period: the period of the heartbeat in milliseconds"""
66 66
67 period=CFloat(1000, config=True,
67 period = Integer(1000, config=True,
68 68 help='The frequency at which the Hub pings the engines for heartbeats '
69 69 '(in ms)',
70 70 )
71 71
72 72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
74 74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
75 75 def _loop_default(self):
76 76 return ioloop.IOLoop.instance()
77 77
78 78 # not settable:
79 79 hearts=Set()
80 80 responses=Set()
81 81 on_probation=Set()
82 82 last_ping=CFloat(0)
83 83 _new_handlers = Set()
84 84 _failure_handlers = Set()
85 85 lifetime = CFloat(0)
86 86 tic = CFloat(0)
87 87
88 88 def __init__(self, **kwargs):
89 89 super(HeartMonitor, self).__init__(**kwargs)
90 90
91 91 self.pongstream.on_recv(self.handle_pong)
92 92
93 93 def start(self):
94 94 self.tic = time.time()
95 95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
96 96 self.caller.start()
97 97
98 98 def add_new_heart_handler(self, handler):
99 99 """add a new handler for new hearts"""
100 100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
101 101 self._new_handlers.add(handler)
102 102
103 103 def add_heart_failure_handler(self, handler):
104 104 """add a new handler for heart failure"""
105 105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
106 106 self._failure_handlers.add(handler)
107 107
108 108 def beat(self):
109 109 self.pongstream.flush()
110 110 self.last_ping = self.lifetime
111 111
112 112 toc = time.time()
113 113 self.lifetime += toc-self.tic
114 114 self.tic = toc
115 115 self.log.debug("heartbeat::sending %s", self.lifetime)
116 116 goodhearts = self.hearts.intersection(self.responses)
117 117 missed_beats = self.hearts.difference(goodhearts)
118 118 heartfailures = self.on_probation.intersection(missed_beats)
119 119 newhearts = self.responses.difference(goodhearts)
120 120 map(self.handle_new_heart, newhearts)
121 121 map(self.handle_heart_failure, heartfailures)
122 122 self.on_probation = missed_beats.intersection(self.hearts)
123 123 self.responses = set()
124 124 # print self.on_probation, self.hearts
125 125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 126 self.pingstream.send(asbytes(str(self.lifetime)))
127 127
128 128 def handle_new_heart(self, heart):
129 129 if self._new_handlers:
130 130 for handler in self._new_handlers:
131 131 handler(heart)
132 132 else:
133 133 self.log.info("heartbeat::yay, got new heart %s!", heart)
134 134 self.hearts.add(heart)
135 135
136 136 def handle_heart_failure(self, heart):
137 137 if self._failure_handlers:
138 138 for handler in self._failure_handlers:
139 139 try:
140 140 handler(heart)
141 141 except Exception as e:
142 142 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
143 143 pass
144 144 else:
145 145 self.log.info("heartbeat::Heart %s failed :(", heart)
146 146 self.hearts.remove(heart)
147 147
148 148
149 149 def handle_pong(self, msg):
150 150 "a heart just beat"
151 151 current = asbytes(str(self.lifetime))
152 152 last = asbytes(str(self.last_ping))
153 153 if msg[1] == current:
154 154 delta = time.time()-self.tic
155 155 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
156 156 self.responses.add(msg[0])
157 157 elif msg[1] == last:
158 158 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
159 159 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
160 160 self.responses.add(msg[0])
161 161 else:
162 162 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
163 163
164 164
165 165 if __name__ == '__main__':
166 166 loop = ioloop.IOLoop.instance()
167 167 context = zmq.Context()
168 168 pub = context.socket(zmq.PUB)
169 169 pub.bind('tcp://127.0.0.1:5555')
170 170 xrep = context.socket(zmq.ROUTER)
171 171 xrep.bind('tcp://127.0.0.1:5556')
172 172
173 173 outstream = zmqstream.ZMQStream(pub, loop)
174 174 instream = zmqstream.ZMQStream(xrep, loop)
175 175
176 176 hb = HeartMonitor(loop, outstream, instream)
177 177
178 178 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now