##// END OF EJS Templates
Merge pull request #6363 from minrk/debug-heartbeat...
Thomas Kluyver -
r17771:5efa769d merge
parent child Browse files
Show More
@@ -1,114 +1,111 b''
1 """A basic kernel monitor with autorestarting.
1 """A basic kernel monitor with autorestarting.
2
2
3 This watches a kernel's state using KernelManager.is_alive and auto
3 This watches a kernel's state using KernelManager.is_alive and auto
4 restarts the kernel if it dies.
4 restarts the kernel if it dies.
5
5
6 It is an incomplete base class, and must be subclassed.
6 It is an incomplete base class, and must be subclassed.
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 # Copyright (c) IPython Development Team.
10 # Copyright (C) 2013 The IPython Development Team
10 # Distributed under the terms of the Modified BSD License.
11 #
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
15
16 #-----------------------------------------------------------------------------
17 # Imports
18 #-----------------------------------------------------------------------------
19
11
20 from IPython.config.configurable import LoggingConfigurable
12 from IPython.config.configurable import LoggingConfigurable
21 from IPython.utils.traitlets import (
13 from IPython.utils.traitlets import (
22 Instance, Float, Dict, Bool, Integer,
14 Instance, Float, Dict, Bool, Integer,
23 )
15 )
24
16
25 #-----------------------------------------------------------------------------
26 # Code
27 #-----------------------------------------------------------------------------
28
17
29 class KernelRestarter(LoggingConfigurable):
18 class KernelRestarter(LoggingConfigurable):
30 """Monitor and autorestart a kernel."""
19 """Monitor and autorestart a kernel."""
31
20
32 kernel_manager = Instance('IPython.kernel.KernelManager')
21 kernel_manager = Instance('IPython.kernel.KernelManager')
33
22
23 debug = Bool(False, config=True,
24 help="""Whether to include every poll event in debugging output.
25
26 Has to be set explicitly, because there will be *a lot* of output.
27 """
28 )
29
34 time_to_dead = Float(3.0, config=True,
30 time_to_dead = Float(3.0, config=True,
35 help="""Kernel heartbeat interval in seconds."""
31 help="""Kernel heartbeat interval in seconds."""
36 )
32 )
37
33
38 restart_limit = Integer(5, config=True,
34 restart_limit = Integer(5, config=True,
39 help="""The number of consecutive autorestarts before the kernel is presumed dead."""
35 help="""The number of consecutive autorestarts before the kernel is presumed dead."""
40 )
36 )
41 _restarting = Bool(False)
37 _restarting = Bool(False)
42 _restart_count = Integer(0)
38 _restart_count = Integer(0)
43
39
44 callbacks = Dict()
40 callbacks = Dict()
45 def _callbacks_default(self):
41 def _callbacks_default(self):
46 return dict(restart=[], dead=[])
42 return dict(restart=[], dead=[])
47
43
48 def start(self):
44 def start(self):
49 """Start the polling of the kernel."""
45 """Start the polling of the kernel."""
50 raise NotImplementedError("Must be implemented in a subclass")
46 raise NotImplementedError("Must be implemented in a subclass")
51
47
52 def stop(self):
48 def stop(self):
53 """Stop the kernel polling."""
49 """Stop the kernel polling."""
54 raise NotImplementedError("Must be implemented in a subclass")
50 raise NotImplementedError("Must be implemented in a subclass")
55
51
56 def add_callback(self, f, event='restart'):
52 def add_callback(self, f, event='restart'):
57 """register a callback to fire on a particular event
53 """register a callback to fire on a particular event
58
54
59 Possible values for event:
55 Possible values for event:
60
56
61 'restart' (default): kernel has died, and will be restarted.
57 'restart' (default): kernel has died, and will be restarted.
62 'dead': restart has failed, kernel will be left dead.
58 'dead': restart has failed, kernel will be left dead.
63
59
64 """
60 """
65 self.callbacks[event].append(f)
61 self.callbacks[event].append(f)
66
62
67 def remove_callback(self, f, event='restart'):
63 def remove_callback(self, f, event='restart'):
68 """unregister a callback to fire on a particular event
64 """unregister a callback to fire on a particular event
69
65
70 Possible values for event:
66 Possible values for event:
71
67
72 'restart' (default): kernel has died, and will be restarted.
68 'restart' (default): kernel has died, and will be restarted.
73 'dead': restart has failed, kernel will be left dead.
69 'dead': restart has failed, kernel will be left dead.
74
70
75 """
71 """
76 try:
72 try:
77 self.callbacks[event].remove(f)
73 self.callbacks[event].remove(f)
78 except ValueError:
74 except ValueError:
79 pass
75 pass
80
76
81 def _fire_callbacks(self, event):
77 def _fire_callbacks(self, event):
82 """fire our callbacks for a particular event"""
78 """fire our callbacks for a particular event"""
83 for callback in self.callbacks[event]:
79 for callback in self.callbacks[event]:
84 try:
80 try:
85 callback()
81 callback()
86 except Exception as e:
82 except Exception as e:
87 self.log.error("KernelRestarter: %s callback %r failed", event, callback, exc_info=True)
83 self.log.error("KernelRestarter: %s callback %r failed", event, callback, exc_info=True)
88
84
89 def poll(self):
85 def poll(self):
90 self.log.debug('Polling kernel...')
86 if self.debug:
87 self.log.debug('Polling kernel...')
91 if not self.kernel_manager.is_alive():
88 if not self.kernel_manager.is_alive():
92 if self._restarting:
89 if self._restarting:
93 self._restart_count += 1
90 self._restart_count += 1
94 else:
91 else:
95 self._restart_count = 1
92 self._restart_count = 1
96
93
97 if self._restart_count >= self.restart_limit:
94 if self._restart_count >= self.restart_limit:
98 self.log.warn("KernelRestarter: restart failed")
95 self.log.warn("KernelRestarter: restart failed")
99 self._fire_callbacks('dead')
96 self._fire_callbacks('dead')
100 self._restarting = False
97 self._restarting = False
101 self._restart_count = 0
98 self._restart_count = 0
102 self.stop()
99 self.stop()
103 else:
100 else:
104 self.log.info('KernelRestarter: restarting kernel (%i/%i)',
101 self.log.info('KernelRestarter: restarting kernel (%i/%i)',
105 self._restart_count,
102 self._restart_count,
106 self.restart_limit
103 self.restart_limit
107 )
104 )
108 self._fire_callbacks('restart')
105 self._fire_callbacks('restart')
109 self.kernel_manager.restart_kernel(now=True)
106 self.kernel_manager.restart_kernel(now=True)
110 self._restarting = True
107 self._restarting = True
111 else:
108 else:
112 if self._restarting:
109 if self._restarting:
113 self.log.debug("KernelRestarter: restart apparently succeeded")
110 self.log.debug("KernelRestarter: restart apparently succeeded")
114 self._restarting = False
111 self._restarting = False
@@ -1,192 +1,193 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their DEALER identities.
4 and hearts are tracked based on their DEALER identities.
5
6 Authors:
7
8 * Min RK
9 """
5 """
10 #-----------------------------------------------------------------------------
6
11 # Copyright (C) 2010-2011 The IPython Development Team
7 # Copyright (c) IPython Development Team.
12 #
8 # Distributed under the terms of the Modified BSD License.
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
16
9
17 from __future__ import print_function
10 from __future__ import print_function
18 import time
11 import time
19 import uuid
12 import uuid
20
13
21 import zmq
14 import zmq
22 from zmq.devices import ThreadDevice, ThreadMonitoredQueue
15 from zmq.devices import ThreadDevice, ThreadMonitoredQueue
23 from zmq.eventloop import ioloop, zmqstream
16 from zmq.eventloop import ioloop, zmqstream
24
17
25 from IPython.config.configurable import LoggingConfigurable
18 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.py3compat import str_to_bytes
19 from IPython.utils.py3compat import str_to_bytes
27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict
20 from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict, Bool
28
21
29 from IPython.parallel.util import log_errors
22 from IPython.parallel.util import log_errors
30
23
31 class Heart(object):
24 class Heart(object):
32 """A basic heart object for responding to a HeartMonitor.
25 """A basic heart object for responding to a HeartMonitor.
33 This is a simple wrapper with defaults for the most common
26 This is a simple wrapper with defaults for the most common
34 Device model for responding to heartbeats.
27 Device model for responding to heartbeats.
35
28
36 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
29 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
37 SUB/DEALER for in/out.
30 SUB/DEALER for in/out.
38
31
39 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
32 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
40 device=None
33 device=None
41 id=None
34 id=None
42 def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
35 def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
43 if mon_addr is None:
36 if mon_addr is None:
44 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
37 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
45 else:
38 else:
46 self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
39 self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
47 # do not allow the device to share global Context.instance,
40 # do not allow the device to share global Context.instance,
48 # which is the default behavior in pyzmq > 2.1.10
41 # which is the default behavior in pyzmq > 2.1.10
49 self.device.context_factory = zmq.Context
42 self.device.context_factory = zmq.Context
50
43
51 self.device.daemon=True
44 self.device.daemon=True
52 self.device.connect_in(in_addr)
45 self.device.connect_in(in_addr)
53 self.device.connect_out(out_addr)
46 self.device.connect_out(out_addr)
54 if mon_addr is not None:
47 if mon_addr is not None:
55 self.device.connect_mon(mon_addr)
48 self.device.connect_mon(mon_addr)
56 if in_type == zmq.SUB:
49 if in_type == zmq.SUB:
57 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
50 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
58 if heart_id is None:
51 if heart_id is None:
59 heart_id = uuid.uuid4().bytes
52 heart_id = uuid.uuid4().bytes
60 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
53 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
61 self.id = heart_id
54 self.id = heart_id
62
55
63 def start(self):
56 def start(self):
64 return self.device.start()
57 return self.device.start()
65
58
66
59
67 class HeartMonitor(LoggingConfigurable):
60 class HeartMonitor(LoggingConfigurable):
68 """A basic HeartMonitor class
61 """A basic HeartMonitor class
69 pingstream: a PUB stream
62 pingstream: a PUB stream
70 pongstream: an ROUTER stream
63 pongstream: an ROUTER stream
71 period: the period of the heartbeat in milliseconds"""
64 period: the period of the heartbeat in milliseconds"""
72
65
66 debug = Bool(False, config=True,
67 help="""Whether to include every heartbeat in debugging output.
68
69 Has to be set explicitly, because there will be *a lot* of output.
70 """
71 )
73 period = Integer(3000, config=True,
72 period = Integer(3000, config=True,
74 help='The frequency at which the Hub pings the engines for heartbeats '
73 help='The frequency at which the Hub pings the engines for heartbeats '
75 '(in ms)',
74 '(in ms)',
76 )
75 )
77 max_heartmonitor_misses = Integer(10, config=True,
76 max_heartmonitor_misses = Integer(10, config=True,
78 help='Allowed consecutive missed pings from controller Hub to engine before unregistering.',
77 help='Allowed consecutive missed pings from controller Hub to engine before unregistering.',
79 )
78 )
80
79
81 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
80 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
82 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
81 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
83 loop = Instance('zmq.eventloop.ioloop.IOLoop')
82 loop = Instance('zmq.eventloop.ioloop.IOLoop')
84 def _loop_default(self):
83 def _loop_default(self):
85 return ioloop.IOLoop.instance()
84 return ioloop.IOLoop.instance()
86
85
87 # not settable:
86 # not settable:
88 hearts=Set()
87 hearts=Set()
89 responses=Set()
88 responses=Set()
90 on_probation=Dict()
89 on_probation=Dict()
91 last_ping=CFloat(0)
90 last_ping=CFloat(0)
92 _new_handlers = Set()
91 _new_handlers = Set()
93 _failure_handlers = Set()
92 _failure_handlers = Set()
94 lifetime = CFloat(0)
93 lifetime = CFloat(0)
95 tic = CFloat(0)
94 tic = CFloat(0)
96
95
97 def __init__(self, **kwargs):
96 def __init__(self, **kwargs):
98 super(HeartMonitor, self).__init__(**kwargs)
97 super(HeartMonitor, self).__init__(**kwargs)
99
98
100 self.pongstream.on_recv(self.handle_pong)
99 self.pongstream.on_recv(self.handle_pong)
101
100
102 def start(self):
101 def start(self):
103 self.tic = time.time()
102 self.tic = time.time()
104 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
103 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
105 self.caller.start()
104 self.caller.start()
106
105
107 def add_new_heart_handler(self, handler):
106 def add_new_heart_handler(self, handler):
108 """add a new handler for new hearts"""
107 """add a new handler for new hearts"""
109 self.log.debug("heartbeat::new_heart_handler: %s", handler)
108 self.log.debug("heartbeat::new_heart_handler: %s", handler)
110 self._new_handlers.add(handler)
109 self._new_handlers.add(handler)
111
110
112 def add_heart_failure_handler(self, handler):
111 def add_heart_failure_handler(self, handler):
113 """add a new handler for heart failure"""
112 """add a new handler for heart failure"""
114 self.log.debug("heartbeat::new heart failure handler: %s", handler)
113 self.log.debug("heartbeat::new heart failure handler: %s", handler)
115 self._failure_handlers.add(handler)
114 self._failure_handlers.add(handler)
116
115
117 def beat(self):
116 def beat(self):
118 self.pongstream.flush()
117 self.pongstream.flush()
119 self.last_ping = self.lifetime
118 self.last_ping = self.lifetime
120
119
121 toc = time.time()
120 toc = time.time()
122 self.lifetime += toc-self.tic
121 self.lifetime += toc-self.tic
123 self.tic = toc
122 self.tic = toc
124 self.log.debug("heartbeat::sending %s", self.lifetime)
123 if self.debug:
124 self.log.debug("heartbeat::sending %s", self.lifetime)
125 goodhearts = self.hearts.intersection(self.responses)
125 goodhearts = self.hearts.intersection(self.responses)
126 missed_beats = self.hearts.difference(goodhearts)
126 missed_beats = self.hearts.difference(goodhearts)
127 newhearts = self.responses.difference(goodhearts)
127 newhearts = self.responses.difference(goodhearts)
128 for heart in newhearts:
128 for heart in newhearts:
129 self.handle_new_heart(heart)
129 self.handle_new_heart(heart)
130 heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
130 heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
131 self.hearts)
131 self.hearts)
132 for failure in heartfailures:
132 for failure in heartfailures:
133 self.handle_heart_failure(failure)
133 self.handle_heart_failure(failure)
134 self.on_probation = on_probation
134 self.on_probation = on_probation
135 self.responses = set()
135 self.responses = set()
136 #print self.on_probation, self.hearts
136 #print self.on_probation, self.hearts
137 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
137 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
138 self.pingstream.send(str_to_bytes(str(self.lifetime)))
138 self.pingstream.send(str_to_bytes(str(self.lifetime)))
139 # flush stream to force immediate socket send
139 # flush stream to force immediate socket send
140 self.pingstream.flush()
140 self.pingstream.flush()
141
141
142 def _check_missed(self, missed_beats, on_probation, hearts):
142 def _check_missed(self, missed_beats, on_probation, hearts):
143 """Update heartbeats on probation, identifying any that have too many misses.
143 """Update heartbeats on probation, identifying any that have too many misses.
144 """
144 """
145 failures = []
145 failures = []
146 new_probation = {}
146 new_probation = {}
147 for cur_heart in (b for b in missed_beats if b in hearts):
147 for cur_heart in (b for b in missed_beats if b in hearts):
148 miss_count = on_probation.get(cur_heart, 0) + 1
148 miss_count = on_probation.get(cur_heart, 0) + 1
149 self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count))
149 self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count))
150 if miss_count > self.max_heartmonitor_misses:
150 if miss_count > self.max_heartmonitor_misses:
151 failures.append(cur_heart)
151 failures.append(cur_heart)
152 else:
152 else:
153 new_probation[cur_heart] = miss_count
153 new_probation[cur_heart] = miss_count
154 return failures, new_probation
154 return failures, new_probation
155
155
156 def handle_new_heart(self, heart):
156 def handle_new_heart(self, heart):
157 if self._new_handlers:
157 if self._new_handlers:
158 for handler in self._new_handlers:
158 for handler in self._new_handlers:
159 handler(heart)
159 handler(heart)
160 else:
160 else:
161 self.log.info("heartbeat::yay, got new heart %s!", heart)
161 self.log.info("heartbeat::yay, got new heart %s!", heart)
162 self.hearts.add(heart)
162 self.hearts.add(heart)
163
163
164 def handle_heart_failure(self, heart):
164 def handle_heart_failure(self, heart):
165 if self._failure_handlers:
165 if self._failure_handlers:
166 for handler in self._failure_handlers:
166 for handler in self._failure_handlers:
167 try:
167 try:
168 handler(heart)
168 handler(heart)
169 except Exception as e:
169 except Exception as e:
170 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
170 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
171 pass
171 pass
172 else:
172 else:
173 self.log.info("heartbeat::Heart %s failed :(", heart)
173 self.log.info("heartbeat::Heart %s failed :(", heart)
174 self.hearts.remove(heart)
174 self.hearts.remove(heart)
175
175
176
176
177 @log_errors
177 @log_errors
178 def handle_pong(self, msg):
178 def handle_pong(self, msg):
179 "a heart just beat"
179 "a heart just beat"
180 current = str_to_bytes(str(self.lifetime))
180 current = str_to_bytes(str(self.lifetime))
181 last = str_to_bytes(str(self.last_ping))
181 last = str_to_bytes(str(self.last_ping))
182 if msg[1] == current:
182 if msg[1] == current:
183 delta = time.time()-self.tic
183 delta = time.time()-self.tic
184 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
184 if self.debug:
185 self.log.debug("heartbeat::heart %r took %.2f ms to respond", msg[0], 1000*delta)
185 self.responses.add(msg[0])
186 self.responses.add(msg[0])
186 elif msg[1] == last:
187 elif msg[1] == last:
187 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
188 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
188 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
189 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
189 self.responses.add(msg[0])
190 self.responses.add(msg[0])
190 else:
191 else:
191 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
192 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
192
193
General Comments 0
You need to be logged in to leave comments. Login now