##// END OF EJS Templates
remove old debugging code
Jan Schulz -
Show More
@@ -1,189 +1,172 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their DEALER identities.
5 5
6 6 Authors:
7 7
8 8 * Min RK
9 9 """
10 10 #-----------------------------------------------------------------------------
11 11 # Copyright (C) 2010-2011 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-----------------------------------------------------------------------------
16 16
17 17 from __future__ import print_function
18 18 import time
19 19 import uuid
20 20
21 21 import zmq
22 22 from zmq.devices import ThreadDevice, ThreadMonitoredQueue
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.config.configurable import LoggingConfigurable
26 26 from IPython.utils.py3compat import str_to_bytes
27 27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
28 28
29 29 from IPython.parallel.util import log_errors
30 30
31 31 class Heart(object):
32 32 """A basic heart object for responding to a HeartMonitor.
33 33 This is a simple wrapper with defaults for the most common
34 34 Device model for responding to heartbeats.
35 35
36 36 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
37 37 SUB/DEALER for in/out.
38 38
39 39 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
40 40 device=None
41 41 id=None
42 42 def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
43 43 if mon_addr is None:
44 44 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
45 45 else:
46 46 self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
47 47 # do not allow the device to share global Context.instance,
48 48 # which is the default behavior in pyzmq > 2.1.10
49 49 self.device.context_factory = zmq.Context
50 50
51 51 self.device.daemon=True
52 52 self.device.connect_in(in_addr)
53 53 self.device.connect_out(out_addr)
54 54 if mon_addr is not None:
55 55 self.device.connect_mon(mon_addr)
56 56 if in_type == zmq.SUB:
57 57 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
58 58 if heart_id is None:
59 59 heart_id = uuid.uuid4().bytes
60 60 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
61 61 self.id = heart_id
62 62
63 63 def start(self):
64 64 return self.device.start()
65 65
66 66
67 67 class HeartMonitor(LoggingConfigurable):
68 68 """A basic HeartMonitor class
69 69 pingstream: a PUB stream
70 70 pongstream: an ROUTER stream
71 71 period: the period of the heartbeat in milliseconds"""
72 72
73 73 period = Integer(3000, config=True,
74 74 help='The frequency at which the Hub pings the engines for heartbeats '
75 75 '(in ms)',
76 76 )
77 77
78 78 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
79 79 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
80 80 loop = Instance('zmq.eventloop.ioloop.IOLoop')
81 81 def _loop_default(self):
82 82 return ioloop.IOLoop.instance()
83 83
84 84 # not settable:
85 85 hearts=Set()
86 86 responses=Set()
87 87 on_probation=Set()
88 88 last_ping=CFloat(0)
89 89 _new_handlers = Set()
90 90 _failure_handlers = Set()
91 91 lifetime = CFloat(0)
92 92 tic = CFloat(0)
93 93
94 94 def __init__(self, **kwargs):
95 95 super(HeartMonitor, self).__init__(**kwargs)
96 96
97 97 self.pongstream.on_recv(self.handle_pong)
98 98
99 99 def start(self):
100 100 self.tic = time.time()
101 101 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
102 102 self.caller.start()
103 103
104 104 def add_new_heart_handler(self, handler):
105 105 """add a new handler for new hearts"""
106 106 self.log.debug("heartbeat::new_heart_handler: %s", handler)
107 107 self._new_handlers.add(handler)
108 108
109 109 def add_heart_failure_handler(self, handler):
110 110 """add a new handler for heart failure"""
111 111 self.log.debug("heartbeat::new heart failure handler: %s", handler)
112 112 self._failure_handlers.add(handler)
113 113
114 114 def beat(self):
115 115 self.pongstream.flush()
116 116 self.last_ping = self.lifetime
117 117
118 118 toc = time.time()
119 119 self.lifetime += toc-self.tic
120 120 self.tic = toc
121 121 self.log.debug("heartbeat::sending %s", self.lifetime)
122 122 goodhearts = self.hearts.intersection(self.responses)
123 123 missed_beats = self.hearts.difference(goodhearts)
124 124 heartfailures = self.on_probation.intersection(missed_beats)
125 125 newhearts = self.responses.difference(goodhearts)
126 126 map(self.handle_new_heart, newhearts)
127 127 map(self.handle_heart_failure, heartfailures)
128 128 self.on_probation = missed_beats.intersection(self.hearts)
129 129 self.responses = set()
130 130 #print self.on_probation, self.hearts
131 131 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
132 132 self.pingstream.send(str_to_bytes(str(self.lifetime)))
133 133 # flush stream to force immediate socket send
134 134 self.pingstream.flush()
135 135
136 136 def handle_new_heart(self, heart):
137 137 if self._new_handlers:
138 138 for handler in self._new_handlers:
139 139 handler(heart)
140 140 else:
141 141 self.log.info("heartbeat::yay, got new heart %s!", heart)
142 142 self.hearts.add(heart)
143 143
144 144 def handle_heart_failure(self, heart):
145 145 if self._failure_handlers:
146 146 for handler in self._failure_handlers:
147 147 try:
148 148 handler(heart)
149 149 except Exception as e:
150 150 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
151 151 pass
152 152 else:
153 153 self.log.info("heartbeat::Heart %s failed :(", heart)
154 154 self.hearts.remove(heart)
155 155
156 156
157 157 @log_errors
158 158 def handle_pong(self, msg):
159 159 "a heart just beat"
160 160 current = str_to_bytes(str(self.lifetime))
161 161 last = str_to_bytes(str(self.last_ping))
162 162 if msg[1] == current:
163 163 delta = time.time()-self.tic
164 164 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
165 165 self.responses.add(msg[0])
166 166 elif msg[1] == last:
167 167 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
168 168 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
169 169 self.responses.add(msg[0])
170 170 else:
171 171 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
172 172
173
174 if __name__ == '__main__':
175 loop = ioloop.IOLoop.instance()
176 context = zmq.Context()
177 pub = context.socket(zmq.PUB)
178 pub.bind('tcp://127.0.0.1:5555')
179 router = context.socket(zmq.ROUTER)
180 router.bind('tcp://127.0.0.1:5556')
181
182 outstream = zmqstream.ZMQStream(pub, loop)
183 instream = zmqstream.ZMQStream(router, loop)
184
185 hb = HeartMonitor(loop=loop, pingstream=outstream, pongstream=instream)
186 import logging
187 hb.log.setLevel(logging.DEBUG)
188 hb.start()
189 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now