##// END OF EJS Templates
Merge pull request #6363 from minrk/debug-heartbeat...
Thomas Kluyver -
r17771:5efa769d merge
parent child Browse files
Show More
@@ -1,114 +1,111 b''
1 1 """A basic kernel monitor with autorestarting.
2 2
3 3 This watches a kernel's state using KernelManager.is_alive and auto
4 4 restarts the kernel if it dies.
5 5
6 6 It is an incomplete base class, and must be subclassed.
7 7 """
8 8
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2013 The IPython Development Team
11 #
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
15
16 #-----------------------------------------------------------------------------
17 # Imports
18 #-----------------------------------------------------------------------------
9 # Copyright (c) IPython Development Team.
10 # Distributed under the terms of the Modified BSD License.
19 11
20 12 from IPython.config.configurable import LoggingConfigurable
21 13 from IPython.utils.traitlets import (
22 14 Instance, Float, Dict, Bool, Integer,
23 15 )
24 16
25 #-----------------------------------------------------------------------------
26 # Code
27 #-----------------------------------------------------------------------------
28 17
29 18 class KernelRestarter(LoggingConfigurable):
30 19 """Monitor and autorestart a kernel."""
31 20
32 21 kernel_manager = Instance('IPython.kernel.KernelManager')
33
22
23 debug = Bool(False, config=True,
24 help="""Whether to include every poll event in debugging output.
25
26 Has to be set explicitly, because there will be *a lot* of output.
27 """
28 )
29
34 30 time_to_dead = Float(3.0, config=True,
35 31 help="""Kernel heartbeat interval in seconds."""
36 32 )
37 33
38 34 restart_limit = Integer(5, config=True,
39 35 help="""The number of consecutive autorestarts before the kernel is presumed dead."""
40 36 )
41 37 _restarting = Bool(False)
42 38 _restart_count = Integer(0)
43 39
44 40 callbacks = Dict()
45 41 def _callbacks_default(self):
46 42 return dict(restart=[], dead=[])
47 43
48 44 def start(self):
49 45 """Start the polling of the kernel."""
50 46 raise NotImplementedError("Must be implemented in a subclass")
51 47
52 48 def stop(self):
53 49 """Stop the kernel polling."""
54 50 raise NotImplementedError("Must be implemented in a subclass")
55 51
56 52 def add_callback(self, f, event='restart'):
57 53 """register a callback to fire on a particular event
58 54
59 55 Possible values for event:
60 56
61 57 'restart' (default): kernel has died, and will be restarted.
62 58 'dead': restart has failed, kernel will be left dead.
63 59
64 60 """
65 61 self.callbacks[event].append(f)
66 62
67 63 def remove_callback(self, f, event='restart'):
68 64 """unregister a callback to fire on a particular event
69 65
70 66 Possible values for event:
71 67
72 68 'restart' (default): kernel has died, and will be restarted.
73 69 'dead': restart has failed, kernel will be left dead.
74 70
75 71 """
76 72 try:
77 73 self.callbacks[event].remove(f)
78 74 except ValueError:
79 75 pass
80 76
81 77 def _fire_callbacks(self, event):
82 78 """fire our callbacks for a particular event"""
83 79 for callback in self.callbacks[event]:
84 80 try:
85 81 callback()
86 82 except Exception as e:
87 83 self.log.error("KernelRestarter: %s callback %r failed", event, callback, exc_info=True)
88 84
89 85 def poll(self):
90 self.log.debug('Polling kernel...')
86 if self.debug:
87 self.log.debug('Polling kernel...')
91 88 if not self.kernel_manager.is_alive():
92 89 if self._restarting:
93 90 self._restart_count += 1
94 91 else:
95 92 self._restart_count = 1
96 93
97 94 if self._restart_count >= self.restart_limit:
98 95 self.log.warn("KernelRestarter: restart failed")
99 96 self._fire_callbacks('dead')
100 97 self._restarting = False
101 98 self._restart_count = 0
102 99 self.stop()
103 100 else:
104 101 self.log.info('KernelRestarter: restarting kernel (%i/%i)',
105 102 self._restart_count,
106 103 self.restart_limit
107 104 )
108 105 self._fire_callbacks('restart')
109 106 self.kernel_manager.restart_kernel(now=True)
110 107 self._restarting = True
111 108 else:
112 109 if self._restarting:
113 110 self.log.debug("KernelRestarter: restart apparently succeeded")
114 111 self._restarting = False
@@ -1,192 +1,193 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their DEALER identities.
5
6 Authors:
7
8 * Min RK
9 5 """
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010-2011 The IPython Development Team
12 #
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
6
7 # Copyright (c) IPython Development Team.
8 # Distributed under the terms of the Modified BSD License.
16 9
17 10 from __future__ import print_function
18 11 import time
19 12 import uuid
20 13
21 14 import zmq
22 15 from zmq.devices import ThreadDevice, ThreadMonitoredQueue
23 16 from zmq.eventloop import ioloop, zmqstream
24 17
25 18 from IPython.config.configurable import LoggingConfigurable
26 19 from IPython.utils.py3compat import str_to_bytes
27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict
20 from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict, Bool
28 21
29 22 from IPython.parallel.util import log_errors
30 23
31 24 class Heart(object):
32 25 """A basic heart object for responding to a HeartMonitor.
33 26 This is a simple wrapper with defaults for the most common
34 27 Device model for responding to heartbeats.
35 28
36 29 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
37 30 SUB/DEALER for in/out.
38 31
39 32 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
40 33 device=None
41 34 id=None
42 35 def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
43 36 if mon_addr is None:
44 37 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
45 38 else:
46 39 self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
47 40 # do not allow the device to share global Context.instance,
48 41 # which is the default behavior in pyzmq > 2.1.10
49 42 self.device.context_factory = zmq.Context
50 43
51 44 self.device.daemon=True
52 45 self.device.connect_in(in_addr)
53 46 self.device.connect_out(out_addr)
54 47 if mon_addr is not None:
55 48 self.device.connect_mon(mon_addr)
56 49 if in_type == zmq.SUB:
57 50 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
58 51 if heart_id is None:
59 52 heart_id = uuid.uuid4().bytes
60 53 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
61 54 self.id = heart_id
62 55
63 56 def start(self):
64 57 return self.device.start()
65 58
66 59
67 60 class HeartMonitor(LoggingConfigurable):
68 61 """A basic HeartMonitor class
69 62 pingstream: a PUB stream
70 63 pongstream: an ROUTER stream
71 64 period: the period of the heartbeat in milliseconds"""
72
65
66 debug = Bool(False, config=True,
67 help="""Whether to include every heartbeat in debugging output.
68
69 Has to be set explicitly, because there will be *a lot* of output.
70 """
71 )
73 72 period = Integer(3000, config=True,
74 73 help='The frequency at which the Hub pings the engines for heartbeats '
75 74 '(in ms)',
76 75 )
77 76 max_heartmonitor_misses = Integer(10, config=True,
78 77 help='Allowed consecutive missed pings from controller Hub to engine before unregistering.',
79 78 )
80 79
81 80 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
82 81 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
83 82 loop = Instance('zmq.eventloop.ioloop.IOLoop')
84 83 def _loop_default(self):
85 84 return ioloop.IOLoop.instance()
86 85
87 86 # not settable:
88 87 hearts=Set()
89 88 responses=Set()
90 89 on_probation=Dict()
91 90 last_ping=CFloat(0)
92 91 _new_handlers = Set()
93 92 _failure_handlers = Set()
94 93 lifetime = CFloat(0)
95 94 tic = CFloat(0)
96 95
97 96 def __init__(self, **kwargs):
98 97 super(HeartMonitor, self).__init__(**kwargs)
99 98
100 99 self.pongstream.on_recv(self.handle_pong)
101 100
102 101 def start(self):
103 102 self.tic = time.time()
104 103 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
105 104 self.caller.start()
106 105
107 106 def add_new_heart_handler(self, handler):
108 107 """add a new handler for new hearts"""
109 108 self.log.debug("heartbeat::new_heart_handler: %s", handler)
110 109 self._new_handlers.add(handler)
111 110
112 111 def add_heart_failure_handler(self, handler):
113 112 """add a new handler for heart failure"""
114 113 self.log.debug("heartbeat::new heart failure handler: %s", handler)
115 114 self._failure_handlers.add(handler)
116 115
117 116 def beat(self):
118 117 self.pongstream.flush()
119 118 self.last_ping = self.lifetime
120 119
121 120 toc = time.time()
122 121 self.lifetime += toc-self.tic
123 122 self.tic = toc
124 self.log.debug("heartbeat::sending %s", self.lifetime)
123 if self.debug:
124 self.log.debug("heartbeat::sending %s", self.lifetime)
125 125 goodhearts = self.hearts.intersection(self.responses)
126 126 missed_beats = self.hearts.difference(goodhearts)
127 127 newhearts = self.responses.difference(goodhearts)
128 128 for heart in newhearts:
129 129 self.handle_new_heart(heart)
130 130 heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
131 131 self.hearts)
132 132 for failure in heartfailures:
133 133 self.handle_heart_failure(failure)
134 134 self.on_probation = on_probation
135 135 self.responses = set()
136 136 #print self.on_probation, self.hearts
137 137 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
138 138 self.pingstream.send(str_to_bytes(str(self.lifetime)))
139 139 # flush stream to force immediate socket send
140 140 self.pingstream.flush()
141 141
142 142 def _check_missed(self, missed_beats, on_probation, hearts):
143 143 """Update heartbeats on probation, identifying any that have too many misses.
144 144 """
145 145 failures = []
146 146 new_probation = {}
147 147 for cur_heart in (b for b in missed_beats if b in hearts):
148 148 miss_count = on_probation.get(cur_heart, 0) + 1
149 149 self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count))
150 150 if miss_count > self.max_heartmonitor_misses:
151 151 failures.append(cur_heart)
152 152 else:
153 153 new_probation[cur_heart] = miss_count
154 154 return failures, new_probation
155 155
156 156 def handle_new_heart(self, heart):
157 157 if self._new_handlers:
158 158 for handler in self._new_handlers:
159 159 handler(heart)
160 160 else:
161 161 self.log.info("heartbeat::yay, got new heart %s!", heart)
162 162 self.hearts.add(heart)
163 163
164 164 def handle_heart_failure(self, heart):
165 165 if self._failure_handlers:
166 166 for handler in self._failure_handlers:
167 167 try:
168 168 handler(heart)
169 169 except Exception as e:
170 170 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
171 171 pass
172 172 else:
173 173 self.log.info("heartbeat::Heart %s failed :(", heart)
174 174 self.hearts.remove(heart)
175 175
176 176
177 177 @log_errors
178 178 def handle_pong(self, msg):
179 179 "a heart just beat"
180 180 current = str_to_bytes(str(self.lifetime))
181 181 last = str_to_bytes(str(self.last_ping))
182 182 if msg[1] == current:
183 183 delta = time.time()-self.tic
184 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
184 if self.debug:
185 self.log.debug("heartbeat::heart %r took %.2f ms to respond", msg[0], 1000*delta)
185 186 self.responses.add(msg[0])
186 187 elif msg[1] == last:
187 188 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
188 189 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
189 190 self.responses.add(msg[0])
190 191 else:
191 192 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
192 193
General Comments 0
You need to be logged in to leave comments. Login now