##// END OF EJS Templates
relax default heartbeat period in IPython.parallel to 3s (from 1s)...
MinRK -
Show More
@@ -1,180 +1,180 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010-2011 The IPython Development Team
11 # Copyright (C) 2010-2011 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import print_function
17 from __future__ import print_function
18 import time
18 import time
19 import uuid
19 import uuid
20
20
21 import zmq
21 import zmq
22 from zmq.devices import ThreadDevice
22 from zmq.devices import ThreadDevice
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27
27
28 from IPython.parallel.util import asbytes
28 from IPython.parallel.util import asbytes
29
29
30 class Heart(object):
30 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
31 """A basic heart object for responding to a HeartMonitor.
32 This is a simple wrapper with defaults for the most common
32 This is a simple wrapper with defaults for the most common
33 Device model for responding to heartbeats.
33 Device model for responding to heartbeats.
34
34
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 SUB/XREQ for in/out.
36 SUB/XREQ for in/out.
37
37
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 device=None
39 device=None
40 id=None
40 id=None
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 # do not allow the device to share global Context.instance,
43 # do not allow the device to share global Context.instance,
44 # which is the default behavior in pyzmq > 2.1.10
44 # which is the default behavior in pyzmq > 2.1.10
45 self.device.context_factory = zmq.Context
45 self.device.context_factory = zmq.Context
46
46
47 self.device.daemon=True
47 self.device.daemon=True
48 self.device.connect_in(in_addr)
48 self.device.connect_in(in_addr)
49 self.device.connect_out(out_addr)
49 self.device.connect_out(out_addr)
50 if in_type == zmq.SUB:
50 if in_type == zmq.SUB:
51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
52 if heart_id is None:
52 if heart_id is None:
53 heart_id = uuid.uuid4().bytes
53 heart_id = uuid.uuid4().bytes
54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
55 self.id = heart_id
55 self.id = heart_id
56
56
57 def start(self):
57 def start(self):
58 return self.device.start()
58 return self.device.start()
59
59
60
60
61 class HeartMonitor(LoggingConfigurable):
61 class HeartMonitor(LoggingConfigurable):
62 """A basic HeartMonitor class
62 """A basic HeartMonitor class
63 pingstream: a PUB stream
63 pingstream: a PUB stream
64 pongstream: an XREP stream
64 pongstream: an XREP stream
65 period: the period of the heartbeat in milliseconds"""
65 period: the period of the heartbeat in milliseconds"""
66
66
67 period = Integer(1000, config=True,
67 period = Integer(3000, config=True,
68 help='The frequency at which the Hub pings the engines for heartbeats '
68 help='The frequency at which the Hub pings the engines for heartbeats '
69 '(in ms)',
69 '(in ms)',
70 )
70 )
71
71
72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
75 def _loop_default(self):
75 def _loop_default(self):
76 return ioloop.IOLoop.instance()
76 return ioloop.IOLoop.instance()
77
77
78 # not settable:
78 # not settable:
79 hearts=Set()
79 hearts=Set()
80 responses=Set()
80 responses=Set()
81 on_probation=Set()
81 on_probation=Set()
82 last_ping=CFloat(0)
82 last_ping=CFloat(0)
83 _new_handlers = Set()
83 _new_handlers = Set()
84 _failure_handlers = Set()
84 _failure_handlers = Set()
85 lifetime = CFloat(0)
85 lifetime = CFloat(0)
86 tic = CFloat(0)
86 tic = CFloat(0)
87
87
88 def __init__(self, **kwargs):
88 def __init__(self, **kwargs):
89 super(HeartMonitor, self).__init__(**kwargs)
89 super(HeartMonitor, self).__init__(**kwargs)
90
90
91 self.pongstream.on_recv(self.handle_pong)
91 self.pongstream.on_recv(self.handle_pong)
92
92
93 def start(self):
93 def start(self):
94 self.tic = time.time()
94 self.tic = time.time()
95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
96 self.caller.start()
96 self.caller.start()
97
97
98 def add_new_heart_handler(self, handler):
98 def add_new_heart_handler(self, handler):
99 """add a new handler for new hearts"""
99 """add a new handler for new hearts"""
100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
101 self._new_handlers.add(handler)
101 self._new_handlers.add(handler)
102
102
103 def add_heart_failure_handler(self, handler):
103 def add_heart_failure_handler(self, handler):
104 """add a new handler for heart failure"""
104 """add a new handler for heart failure"""
105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
106 self._failure_handlers.add(handler)
106 self._failure_handlers.add(handler)
107
107
108 def beat(self):
108 def beat(self):
109 self.pongstream.flush()
109 self.pongstream.flush()
110 self.last_ping = self.lifetime
110 self.last_ping = self.lifetime
111
111
112 toc = time.time()
112 toc = time.time()
113 self.lifetime += toc-self.tic
113 self.lifetime += toc-self.tic
114 self.tic = toc
114 self.tic = toc
115 self.log.debug("heartbeat::sending %s", self.lifetime)
115 self.log.debug("heartbeat::sending %s", self.lifetime)
116 goodhearts = self.hearts.intersection(self.responses)
116 goodhearts = self.hearts.intersection(self.responses)
117 missed_beats = self.hearts.difference(goodhearts)
117 missed_beats = self.hearts.difference(goodhearts)
118 heartfailures = self.on_probation.intersection(missed_beats)
118 heartfailures = self.on_probation.intersection(missed_beats)
119 newhearts = self.responses.difference(goodhearts)
119 newhearts = self.responses.difference(goodhearts)
120 map(self.handle_new_heart, newhearts)
120 map(self.handle_new_heart, newhearts)
121 map(self.handle_heart_failure, heartfailures)
121 map(self.handle_heart_failure, heartfailures)
122 self.on_probation = missed_beats.intersection(self.hearts)
122 self.on_probation = missed_beats.intersection(self.hearts)
123 self.responses = set()
123 self.responses = set()
124 # print self.on_probation, self.hearts
124 # print self.on_probation, self.hearts
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 self.pingstream.send(asbytes(str(self.lifetime)))
126 self.pingstream.send(asbytes(str(self.lifetime)))
127 # flush stream to force immediate socket send
127 # flush stream to force immediate socket send
128 self.pingstream.flush()
128 self.pingstream.flush()
129
129
130 def handle_new_heart(self, heart):
130 def handle_new_heart(self, heart):
131 if self._new_handlers:
131 if self._new_handlers:
132 for handler in self._new_handlers:
132 for handler in self._new_handlers:
133 handler(heart)
133 handler(heart)
134 else:
134 else:
135 self.log.info("heartbeat::yay, got new heart %s!", heart)
135 self.log.info("heartbeat::yay, got new heart %s!", heart)
136 self.hearts.add(heart)
136 self.hearts.add(heart)
137
137
138 def handle_heart_failure(self, heart):
138 def handle_heart_failure(self, heart):
139 if self._failure_handlers:
139 if self._failure_handlers:
140 for handler in self._failure_handlers:
140 for handler in self._failure_handlers:
141 try:
141 try:
142 handler(heart)
142 handler(heart)
143 except Exception as e:
143 except Exception as e:
144 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
144 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
145 pass
145 pass
146 else:
146 else:
147 self.log.info("heartbeat::Heart %s failed :(", heart)
147 self.log.info("heartbeat::Heart %s failed :(", heart)
148 self.hearts.remove(heart)
148 self.hearts.remove(heart)
149
149
150
150
151 def handle_pong(self, msg):
151 def handle_pong(self, msg):
152 "a heart just beat"
152 "a heart just beat"
153 current = asbytes(str(self.lifetime))
153 current = asbytes(str(self.lifetime))
154 last = asbytes(str(self.last_ping))
154 last = asbytes(str(self.last_ping))
155 if msg[1] == current:
155 if msg[1] == current:
156 delta = time.time()-self.tic
156 delta = time.time()-self.tic
157 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
157 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
158 self.responses.add(msg[0])
158 self.responses.add(msg[0])
159 elif msg[1] == last:
159 elif msg[1] == last:
160 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
160 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
161 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
161 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
162 self.responses.add(msg[0])
162 self.responses.add(msg[0])
163 else:
163 else:
164 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
164 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
165
165
166
166
167 if __name__ == '__main__':
167 if __name__ == '__main__':
168 loop = ioloop.IOLoop.instance()
168 loop = ioloop.IOLoop.instance()
169 context = zmq.Context()
169 context = zmq.Context()
170 pub = context.socket(zmq.PUB)
170 pub = context.socket(zmq.PUB)
171 pub.bind('tcp://127.0.0.1:5555')
171 pub.bind('tcp://127.0.0.1:5555')
172 xrep = context.socket(zmq.ROUTER)
172 xrep = context.socket(zmq.ROUTER)
173 xrep.bind('tcp://127.0.0.1:5556')
173 xrep.bind('tcp://127.0.0.1:5556')
174
174
175 outstream = zmqstream.ZMQStream(pub, loop)
175 outstream = zmqstream.ZMQStream(pub, loop)
176 instream = zmqstream.ZMQStream(xrep, loop)
176 instream = zmqstream.ZMQStream(xrep, loop)
177
177
178 hb = HeartMonitor(loop, outstream, instream)
178 hb = HeartMonitor(loop, outstream, instream)
179
179
180 loop.start()
180 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now