##// END OF EJS Templates
relax default heartbeat period in IPython.parallel to 3s (from 1s)...
MinRK -
Show More
@@ -1,180 +1,180 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their XREQ identities.
5 5
6 6 Authors:
7 7
8 8 * Min RK
9 9 """
10 10 #-----------------------------------------------------------------------------
11 11 # Copyright (C) 2010-2011 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-----------------------------------------------------------------------------
16 16
17 17 from __future__ import print_function
18 18 import time
19 19 import uuid
20 20
21 21 import zmq
22 22 from zmq.devices import ThreadDevice
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.config.configurable import LoggingConfigurable
26 26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27 27
28 28 from IPython.parallel.util import asbytes
29 29
30 30 class Heart(object):
31 31 """A basic heart object for responding to a HeartMonitor.
32 32 This is a simple wrapper with defaults for the most common
33 33 Device model for responding to heartbeats.
34 34
35 35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 36 SUB/XREQ for in/out.
37 37
38 38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 39 device=None
40 40 id=None
41 41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 43 # do not allow the device to share global Context.instance,
44 44 # which is the default behavior in pyzmq > 2.1.10
45 45 self.device.context_factory = zmq.Context
46 46
47 47 self.device.daemon=True
48 48 self.device.connect_in(in_addr)
49 49 self.device.connect_out(out_addr)
50 50 if in_type == zmq.SUB:
51 51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
52 52 if heart_id is None:
53 53 heart_id = uuid.uuid4().bytes
54 54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
55 55 self.id = heart_id
56 56
57 57 def start(self):
58 58 return self.device.start()
59 59
60 60
61 61 class HeartMonitor(LoggingConfigurable):
62 62 """A basic HeartMonitor class
63 63 pingstream: a PUB stream
64 64 pongstream: an XREP stream
65 65 period: the period of the heartbeat in milliseconds"""
66 66
67 period = Integer(1000, config=True,
67 period = Integer(3000, config=True,
68 68 help='The frequency at which the Hub pings the engines for heartbeats '
69 69 '(in ms)',
70 70 )
71 71
72 72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
74 74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
75 75 def _loop_default(self):
76 76 return ioloop.IOLoop.instance()
77 77
78 78 # not settable:
79 79 hearts=Set()
80 80 responses=Set()
81 81 on_probation=Set()
82 82 last_ping=CFloat(0)
83 83 _new_handlers = Set()
84 84 _failure_handlers = Set()
85 85 lifetime = CFloat(0)
86 86 tic = CFloat(0)
87 87
88 88 def __init__(self, **kwargs):
89 89 super(HeartMonitor, self).__init__(**kwargs)
90 90
91 91 self.pongstream.on_recv(self.handle_pong)
92 92
93 93 def start(self):
94 94 self.tic = time.time()
95 95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
96 96 self.caller.start()
97 97
98 98 def add_new_heart_handler(self, handler):
99 99 """add a new handler for new hearts"""
100 100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
101 101 self._new_handlers.add(handler)
102 102
103 103 def add_heart_failure_handler(self, handler):
104 104 """add a new handler for heart failure"""
105 105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
106 106 self._failure_handlers.add(handler)
107 107
108 108 def beat(self):
109 109 self.pongstream.flush()
110 110 self.last_ping = self.lifetime
111 111
112 112 toc = time.time()
113 113 self.lifetime += toc-self.tic
114 114 self.tic = toc
115 115 self.log.debug("heartbeat::sending %s", self.lifetime)
116 116 goodhearts = self.hearts.intersection(self.responses)
117 117 missed_beats = self.hearts.difference(goodhearts)
118 118 heartfailures = self.on_probation.intersection(missed_beats)
119 119 newhearts = self.responses.difference(goodhearts)
120 120 map(self.handle_new_heart, newhearts)
121 121 map(self.handle_heart_failure, heartfailures)
122 122 self.on_probation = missed_beats.intersection(self.hearts)
123 123 self.responses = set()
124 124 # print self.on_probation, self.hearts
125 125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 126 self.pingstream.send(asbytes(str(self.lifetime)))
127 127 # flush stream to force immediate socket send
128 128 self.pingstream.flush()
129 129
130 130 def handle_new_heart(self, heart):
131 131 if self._new_handlers:
132 132 for handler in self._new_handlers:
133 133 handler(heart)
134 134 else:
135 135 self.log.info("heartbeat::yay, got new heart %s!", heart)
136 136 self.hearts.add(heart)
137 137
138 138 def handle_heart_failure(self, heart):
139 139 if self._failure_handlers:
140 140 for handler in self._failure_handlers:
141 141 try:
142 142 handler(heart)
143 143 except Exception as e:
144 144 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
145 145 pass
146 146 else:
147 147 self.log.info("heartbeat::Heart %s failed :(", heart)
148 148 self.hearts.remove(heart)
149 149
150 150
151 151 def handle_pong(self, msg):
152 152 "a heart just beat"
153 153 current = asbytes(str(self.lifetime))
154 154 last = asbytes(str(self.last_ping))
155 155 if msg[1] == current:
156 156 delta = time.time()-self.tic
157 157 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
158 158 self.responses.add(msg[0])
159 159 elif msg[1] == last:
160 160 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
161 161 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
162 162 self.responses.add(msg[0])
163 163 else:
164 164 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
165 165
166 166
167 167 if __name__ == '__main__':
168 168 loop = ioloop.IOLoop.instance()
169 169 context = zmq.Context()
170 170 pub = context.socket(zmq.PUB)
171 171 pub.bind('tcp://127.0.0.1:5555')
172 172 xrep = context.socket(zmq.ROUTER)
173 173 xrep.bind('tcp://127.0.0.1:5556')
174 174
175 175 outstream = zmqstream.ZMQStream(pub, loop)
176 176 instream = zmqstream.ZMQStream(xrep, loop)
177 177
178 178 hb = HeartMonitor(loop, outstream, instream)
179 179
180 180 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now