##// END OF EJS Templates
remove old debugging code
Jan Schulz -
Show More
@@ -1,189 +1,172 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their DEALER identities.
4 and hearts are tracked based on their DEALER identities.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010-2011 The IPython Development Team
11 # Copyright (C) 2010-2011 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import print_function
17 from __future__ import print_function
18 import time
18 import time
19 import uuid
19 import uuid
20
20
21 import zmq
21 import zmq
22 from zmq.devices import ThreadDevice, ThreadMonitoredQueue
22 from zmq.devices import ThreadDevice, ThreadMonitoredQueue
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.py3compat import str_to_bytes
26 from IPython.utils.py3compat import str_to_bytes
27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
28
28
29 from IPython.parallel.util import log_errors
29 from IPython.parallel.util import log_errors
30
30
31 class Heart(object):
31 class Heart(object):
32 """A basic heart object for responding to a HeartMonitor.
32 """A basic heart object for responding to a HeartMonitor.
33 This is a simple wrapper with defaults for the most common
33 This is a simple wrapper with defaults for the most common
34 Device model for responding to heartbeats.
34 Device model for responding to heartbeats.
35
35
36 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
37 SUB/DEALER for in/out.
37 SUB/DEALER for in/out.
38
38
39 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
39 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
40 device=None
40 device=None
41 id=None
41 id=None
42 def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
42 def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
43 if mon_addr is None:
43 if mon_addr is None:
44 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
44 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
45 else:
45 else:
46 self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
46 self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
47 # do not allow the device to share global Context.instance,
47 # do not allow the device to share global Context.instance,
48 # which is the default behavior in pyzmq > 2.1.10
48 # which is the default behavior in pyzmq > 2.1.10
49 self.device.context_factory = zmq.Context
49 self.device.context_factory = zmq.Context
50
50
51 self.device.daemon=True
51 self.device.daemon=True
52 self.device.connect_in(in_addr)
52 self.device.connect_in(in_addr)
53 self.device.connect_out(out_addr)
53 self.device.connect_out(out_addr)
54 if mon_addr is not None:
54 if mon_addr is not None:
55 self.device.connect_mon(mon_addr)
55 self.device.connect_mon(mon_addr)
56 if in_type == zmq.SUB:
56 if in_type == zmq.SUB:
57 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
57 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
58 if heart_id is None:
58 if heart_id is None:
59 heart_id = uuid.uuid4().bytes
59 heart_id = uuid.uuid4().bytes
60 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
60 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
61 self.id = heart_id
61 self.id = heart_id
62
62
63 def start(self):
63 def start(self):
64 return self.device.start()
64 return self.device.start()
65
65
66
66
67 class HeartMonitor(LoggingConfigurable):
67 class HeartMonitor(LoggingConfigurable):
68 """A basic HeartMonitor class
68 """A basic HeartMonitor class
69 pingstream: a PUB stream
69 pingstream: a PUB stream
70 pongstream: an ROUTER stream
70 pongstream: an ROUTER stream
71 period: the period of the heartbeat in milliseconds"""
71 period: the period of the heartbeat in milliseconds"""
72
72
73 period = Integer(3000, config=True,
73 period = Integer(3000, config=True,
74 help='The frequency at which the Hub pings the engines for heartbeats '
74 help='The frequency at which the Hub pings the engines for heartbeats '
75 '(in ms)',
75 '(in ms)',
76 )
76 )
77
77
78 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
78 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
79 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
79 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
80 loop = Instance('zmq.eventloop.ioloop.IOLoop')
80 loop = Instance('zmq.eventloop.ioloop.IOLoop')
81 def _loop_default(self):
81 def _loop_default(self):
82 return ioloop.IOLoop.instance()
82 return ioloop.IOLoop.instance()
83
83
84 # not settable:
84 # not settable:
85 hearts=Set()
85 hearts=Set()
86 responses=Set()
86 responses=Set()
87 on_probation=Set()
87 on_probation=Set()
88 last_ping=CFloat(0)
88 last_ping=CFloat(0)
89 _new_handlers = Set()
89 _new_handlers = Set()
90 _failure_handlers = Set()
90 _failure_handlers = Set()
91 lifetime = CFloat(0)
91 lifetime = CFloat(0)
92 tic = CFloat(0)
92 tic = CFloat(0)
93
93
94 def __init__(self, **kwargs):
94 def __init__(self, **kwargs):
95 super(HeartMonitor, self).__init__(**kwargs)
95 super(HeartMonitor, self).__init__(**kwargs)
96
96
97 self.pongstream.on_recv(self.handle_pong)
97 self.pongstream.on_recv(self.handle_pong)
98
98
99 def start(self):
99 def start(self):
100 self.tic = time.time()
100 self.tic = time.time()
101 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
101 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
102 self.caller.start()
102 self.caller.start()
103
103
104 def add_new_heart_handler(self, handler):
104 def add_new_heart_handler(self, handler):
105 """add a new handler for new hearts"""
105 """add a new handler for new hearts"""
106 self.log.debug("heartbeat::new_heart_handler: %s", handler)
106 self.log.debug("heartbeat::new_heart_handler: %s", handler)
107 self._new_handlers.add(handler)
107 self._new_handlers.add(handler)
108
108
109 def add_heart_failure_handler(self, handler):
109 def add_heart_failure_handler(self, handler):
110 """add a new handler for heart failure"""
110 """add a new handler for heart failure"""
111 self.log.debug("heartbeat::new heart failure handler: %s", handler)
111 self.log.debug("heartbeat::new heart failure handler: %s", handler)
112 self._failure_handlers.add(handler)
112 self._failure_handlers.add(handler)
113
113
114 def beat(self):
114 def beat(self):
115 self.pongstream.flush()
115 self.pongstream.flush()
116 self.last_ping = self.lifetime
116 self.last_ping = self.lifetime
117
117
118 toc = time.time()
118 toc = time.time()
119 self.lifetime += toc-self.tic
119 self.lifetime += toc-self.tic
120 self.tic = toc
120 self.tic = toc
121 self.log.debug("heartbeat::sending %s", self.lifetime)
121 self.log.debug("heartbeat::sending %s", self.lifetime)
122 goodhearts = self.hearts.intersection(self.responses)
122 goodhearts = self.hearts.intersection(self.responses)
123 missed_beats = self.hearts.difference(goodhearts)
123 missed_beats = self.hearts.difference(goodhearts)
124 heartfailures = self.on_probation.intersection(missed_beats)
124 heartfailures = self.on_probation.intersection(missed_beats)
125 newhearts = self.responses.difference(goodhearts)
125 newhearts = self.responses.difference(goodhearts)
126 map(self.handle_new_heart, newhearts)
126 map(self.handle_new_heart, newhearts)
127 map(self.handle_heart_failure, heartfailures)
127 map(self.handle_heart_failure, heartfailures)
128 self.on_probation = missed_beats.intersection(self.hearts)
128 self.on_probation = missed_beats.intersection(self.hearts)
129 self.responses = set()
129 self.responses = set()
130 #print self.on_probation, self.hearts
130 #print self.on_probation, self.hearts
131 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
131 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
132 self.pingstream.send(str_to_bytes(str(self.lifetime)))
132 self.pingstream.send(str_to_bytes(str(self.lifetime)))
133 # flush stream to force immediate socket send
133 # flush stream to force immediate socket send
134 self.pingstream.flush()
134 self.pingstream.flush()
135
135
136 def handle_new_heart(self, heart):
136 def handle_new_heart(self, heart):
137 if self._new_handlers:
137 if self._new_handlers:
138 for handler in self._new_handlers:
138 for handler in self._new_handlers:
139 handler(heart)
139 handler(heart)
140 else:
140 else:
141 self.log.info("heartbeat::yay, got new heart %s!", heart)
141 self.log.info("heartbeat::yay, got new heart %s!", heart)
142 self.hearts.add(heart)
142 self.hearts.add(heart)
143
143
144 def handle_heart_failure(self, heart):
144 def handle_heart_failure(self, heart):
145 if self._failure_handlers:
145 if self._failure_handlers:
146 for handler in self._failure_handlers:
146 for handler in self._failure_handlers:
147 try:
147 try:
148 handler(heart)
148 handler(heart)
149 except Exception as e:
149 except Exception as e:
150 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
150 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
151 pass
151 pass
152 else:
152 else:
153 self.log.info("heartbeat::Heart %s failed :(", heart)
153 self.log.info("heartbeat::Heart %s failed :(", heart)
154 self.hearts.remove(heart)
154 self.hearts.remove(heart)
155
155
156
156
157 @log_errors
157 @log_errors
158 def handle_pong(self, msg):
158 def handle_pong(self, msg):
159 "a heart just beat"
159 "a heart just beat"
160 current = str_to_bytes(str(self.lifetime))
160 current = str_to_bytes(str(self.lifetime))
161 last = str_to_bytes(str(self.last_ping))
161 last = str_to_bytes(str(self.last_ping))
162 if msg[1] == current:
162 if msg[1] == current:
163 delta = time.time()-self.tic
163 delta = time.time()-self.tic
164 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
164 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
165 self.responses.add(msg[0])
165 self.responses.add(msg[0])
166 elif msg[1] == last:
166 elif msg[1] == last:
167 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
167 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
168 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
168 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
169 self.responses.add(msg[0])
169 self.responses.add(msg[0])
170 else:
170 else:
171 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
171 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
172
172
173
174 if __name__ == '__main__':
175 loop = ioloop.IOLoop.instance()
176 context = zmq.Context()
177 pub = context.socket(zmq.PUB)
178 pub.bind('tcp://127.0.0.1:5555')
179 router = context.socket(zmq.ROUTER)
180 router.bind('tcp://127.0.0.1:5556')
181
182 outstream = zmqstream.ZMQStream(pub, loop)
183 instream = zmqstream.ZMQStream(router, loop)
184
185 hb = HeartMonitor(loop=loop, pingstream=outstream, pongstream=instream)
186 import logging
187 hb.log.setLevel(logging.DEBUG)
188 hb.start()
189 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now