Show More
@@ -1,114 +1,111 b'' | |||
|
1 | 1 | """A basic kernel monitor with autorestarting. |
|
2 | 2 | |
|
3 | 3 | This watches a kernel's state using KernelManager.is_alive and auto |
|
4 | 4 | restarts the kernel if it dies. |
|
5 | 5 | |
|
6 | 6 | It is an incomplete base class, and must be subclassed. |
|
7 | 7 | """ |
|
8 | 8 | |
|
9 | #----------------------------------------------------------------------------- | |
|
10 | # Copyright (C) 2013 The IPython Development Team | |
|
11 | # | |
|
12 | # Distributed under the terms of the BSD License. The full license is in | |
|
13 | # the file COPYING, distributed as part of this software. | |
|
14 | #----------------------------------------------------------------------------- | |
|
15 | ||
|
16 | #----------------------------------------------------------------------------- | |
|
17 | # Imports | |
|
18 | #----------------------------------------------------------------------------- | |
|
9 | # Copyright (c) IPython Development Team. | |
|
10 | # Distributed under the terms of the Modified BSD License. | |
|
19 | 11 | |
|
20 | 12 | from IPython.config.configurable import LoggingConfigurable |
|
21 | 13 | from IPython.utils.traitlets import ( |
|
22 | 14 | Instance, Float, Dict, Bool, Integer, |
|
23 | 15 | ) |
|
24 | 16 | |
|
25 | #----------------------------------------------------------------------------- | |
|
26 | # Code | |
|
27 | #----------------------------------------------------------------------------- | |
|
28 | 17 | |
|
29 | 18 | class KernelRestarter(LoggingConfigurable): |
|
30 | 19 | """Monitor and autorestart a kernel.""" |
|
31 | 20 | |
|
32 | 21 | kernel_manager = Instance('IPython.kernel.KernelManager') |
|
33 | ||
|
22 | ||
|
23 | debug = Bool(False, config=True, | |
|
24 | help="""Whether to include every poll event in debugging output. | |
|
25 | ||
|
26 | Has to be set explicitly, because there will be *a lot* of output. | |
|
27 | """ | |
|
28 | ) | |
|
29 | ||
|
34 | 30 | time_to_dead = Float(3.0, config=True, |
|
35 | 31 | help="""Kernel heartbeat interval in seconds.""" |
|
36 | 32 | ) |
|
37 | 33 | |
|
38 | 34 | restart_limit = Integer(5, config=True, |
|
39 | 35 | help="""The number of consecutive autorestarts before the kernel is presumed dead.""" |
|
40 | 36 | ) |
|
41 | 37 | _restarting = Bool(False) |
|
42 | 38 | _restart_count = Integer(0) |
|
43 | 39 | |
|
44 | 40 | callbacks = Dict() |
|
45 | 41 | def _callbacks_default(self): |
|
46 | 42 | return dict(restart=[], dead=[]) |
|
47 | 43 | |
|
48 | 44 | def start(self): |
|
49 | 45 | """Start the polling of the kernel.""" |
|
50 | 46 | raise NotImplementedError("Must be implemented in a subclass") |
|
51 | 47 | |
|
52 | 48 | def stop(self): |
|
53 | 49 | """Stop the kernel polling.""" |
|
54 | 50 | raise NotImplementedError("Must be implemented in a subclass") |
|
55 | 51 | |
|
56 | 52 | def add_callback(self, f, event='restart'): |
|
57 | 53 | """register a callback to fire on a particular event |
|
58 | 54 | |
|
59 | 55 | Possible values for event: |
|
60 | 56 | |
|
61 | 57 | 'restart' (default): kernel has died, and will be restarted. |
|
62 | 58 | 'dead': restart has failed, kernel will be left dead. |
|
63 | 59 | |
|
64 | 60 | """ |
|
65 | 61 | self.callbacks[event].append(f) |
|
66 | 62 | |
|
67 | 63 | def remove_callback(self, f, event='restart'): |
|
68 | 64 | """unregister a callback to fire on a particular event |
|
69 | 65 | |
|
70 | 66 | Possible values for event: |
|
71 | 67 | |
|
72 | 68 | 'restart' (default): kernel has died, and will be restarted. |
|
73 | 69 | 'dead': restart has failed, kernel will be left dead. |
|
74 | 70 | |
|
75 | 71 | """ |
|
76 | 72 | try: |
|
77 | 73 | self.callbacks[event].remove(f) |
|
78 | 74 | except ValueError: |
|
79 | 75 | pass |
|
80 | 76 | |
|
81 | 77 | def _fire_callbacks(self, event): |
|
82 | 78 | """fire our callbacks for a particular event""" |
|
83 | 79 | for callback in self.callbacks[event]: |
|
84 | 80 | try: |
|
85 | 81 | callback() |
|
86 | 82 | except Exception as e: |
|
87 | 83 | self.log.error("KernelRestarter: %s callback %r failed", event, callback, exc_info=True) |
|
88 | 84 | |
|
89 | 85 | def poll(self): |
|
90 | self.log.debug('Polling kernel...') | |
|
86 | if self.debug: | |
|
87 | self.log.debug('Polling kernel...') | |
|
91 | 88 | if not self.kernel_manager.is_alive(): |
|
92 | 89 | if self._restarting: |
|
93 | 90 | self._restart_count += 1 |
|
94 | 91 | else: |
|
95 | 92 | self._restart_count = 1 |
|
96 | 93 | |
|
97 | 94 | if self._restart_count >= self.restart_limit: |
|
98 | 95 | self.log.warn("KernelRestarter: restart failed") |
|
99 | 96 | self._fire_callbacks('dead') |
|
100 | 97 | self._restarting = False |
|
101 | 98 | self._restart_count = 0 |
|
102 | 99 | self.stop() |
|
103 | 100 | else: |
|
104 | 101 | self.log.info('KernelRestarter: restarting kernel (%i/%i)', |
|
105 | 102 | self._restart_count, |
|
106 | 103 | self.restart_limit |
|
107 | 104 | ) |
|
108 | 105 | self._fire_callbacks('restart') |
|
109 | 106 | self.kernel_manager.restart_kernel(now=True) |
|
110 | 107 | self._restarting = True |
|
111 | 108 | else: |
|
112 | 109 | if self._restarting: |
|
113 | 110 | self.log.debug("KernelRestarter: restart apparently succeeded") |
|
114 | 111 | self._restarting = False |
@@ -1,192 +1,193 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | """ |
|
3 | 3 | A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB, |
|
4 | 4 | and hearts are tracked based on their DEALER identities. |
|
5 | ||
|
6 | Authors: | |
|
7 | ||
|
8 | * Min RK | |
|
9 | 5 | """ |
|
10 | #----------------------------------------------------------------------------- | |
|
11 |
# |
|
|
12 | # | |
|
13 | # Distributed under the terms of the BSD License. The full license is in | |
|
14 | # the file COPYING, distributed as part of this software. | |
|
15 | #----------------------------------------------------------------------------- | |
|
6 | ||
|
7 | # Copyright (c) IPython Development Team. | |
|
8 | # Distributed under the terms of the Modified BSD License. | |
|
16 | 9 | |
|
17 | 10 | from __future__ import print_function |
|
18 | 11 | import time |
|
19 | 12 | import uuid |
|
20 | 13 | |
|
21 | 14 | import zmq |
|
22 | 15 | from zmq.devices import ThreadDevice, ThreadMonitoredQueue |
|
23 | 16 | from zmq.eventloop import ioloop, zmqstream |
|
24 | 17 | |
|
25 | 18 | from IPython.config.configurable import LoggingConfigurable |
|
26 | 19 | from IPython.utils.py3compat import str_to_bytes |
|
27 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict | |
|
20 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict, Bool | |
|
28 | 21 | |
|
29 | 22 | from IPython.parallel.util import log_errors |
|
30 | 23 | |
|
31 | 24 | class Heart(object): |
|
32 | 25 | """A basic heart object for responding to a HeartMonitor. |
|
33 | 26 | This is a simple wrapper with defaults for the most common |
|
34 | 27 | Device model for responding to heartbeats. |
|
35 | 28 | |
|
36 | 29 | It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using |
|
37 | 30 | SUB/DEALER for in/out. |
|
38 | 31 | |
|
39 | 32 | You can specify the DEALER's IDENTITY via the optional heart_id argument.""" |
|
40 | 33 | device=None |
|
41 | 34 | id=None |
|
42 | 35 | def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None): |
|
43 | 36 | if mon_addr is None: |
|
44 | 37 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) |
|
45 | 38 | else: |
|
46 | 39 | self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"") |
|
47 | 40 | # do not allow the device to share global Context.instance, |
|
48 | 41 | # which is the default behavior in pyzmq > 2.1.10 |
|
49 | 42 | self.device.context_factory = zmq.Context |
|
50 | 43 | |
|
51 | 44 | self.device.daemon=True |
|
52 | 45 | self.device.connect_in(in_addr) |
|
53 | 46 | self.device.connect_out(out_addr) |
|
54 | 47 | if mon_addr is not None: |
|
55 | 48 | self.device.connect_mon(mon_addr) |
|
56 | 49 | if in_type == zmq.SUB: |
|
57 | 50 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") |
|
58 | 51 | if heart_id is None: |
|
59 | 52 | heart_id = uuid.uuid4().bytes |
|
60 | 53 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) |
|
61 | 54 | self.id = heart_id |
|
62 | 55 | |
|
63 | 56 | def start(self): |
|
64 | 57 | return self.device.start() |
|
65 | 58 | |
|
66 | 59 | |
|
67 | 60 | class HeartMonitor(LoggingConfigurable): |
|
68 | 61 | """A basic HeartMonitor class |
|
69 | 62 | pingstream: a PUB stream |
|
70 | 63 | pongstream: an ROUTER stream |
|
71 | 64 | period: the period of the heartbeat in milliseconds""" |
|
72 | ||
|
65 | ||
|
66 | debug = Bool(False, config=True, | |
|
67 | help="""Whether to include every heartbeat in debugging output. | |
|
68 | ||
|
69 | Has to be set explicitly, because there will be *a lot* of output. | |
|
70 | """ | |
|
71 | ) | |
|
73 | 72 | period = Integer(3000, config=True, |
|
74 | 73 | help='The frequency at which the Hub pings the engines for heartbeats ' |
|
75 | 74 | '(in ms)', |
|
76 | 75 | ) |
|
77 | 76 | max_heartmonitor_misses = Integer(10, config=True, |
|
78 | 77 | help='Allowed consecutive missed pings from controller Hub to engine before unregistering.', |
|
79 | 78 | ) |
|
80 | 79 | |
|
81 | 80 | pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
82 | 81 | pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
83 | 82 | loop = Instance('zmq.eventloop.ioloop.IOLoop') |
|
84 | 83 | def _loop_default(self): |
|
85 | 84 | return ioloop.IOLoop.instance() |
|
86 | 85 | |
|
87 | 86 | # not settable: |
|
88 | 87 | hearts=Set() |
|
89 | 88 | responses=Set() |
|
90 | 89 | on_probation=Dict() |
|
91 | 90 | last_ping=CFloat(0) |
|
92 | 91 | _new_handlers = Set() |
|
93 | 92 | _failure_handlers = Set() |
|
94 | 93 | lifetime = CFloat(0) |
|
95 | 94 | tic = CFloat(0) |
|
96 | 95 | |
|
97 | 96 | def __init__(self, **kwargs): |
|
98 | 97 | super(HeartMonitor, self).__init__(**kwargs) |
|
99 | 98 | |
|
100 | 99 | self.pongstream.on_recv(self.handle_pong) |
|
101 | 100 | |
|
102 | 101 | def start(self): |
|
103 | 102 | self.tic = time.time() |
|
104 | 103 | self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) |
|
105 | 104 | self.caller.start() |
|
106 | 105 | |
|
107 | 106 | def add_new_heart_handler(self, handler): |
|
108 | 107 | """add a new handler for new hearts""" |
|
109 | 108 | self.log.debug("heartbeat::new_heart_handler: %s", handler) |
|
110 | 109 | self._new_handlers.add(handler) |
|
111 | 110 | |
|
112 | 111 | def add_heart_failure_handler(self, handler): |
|
113 | 112 | """add a new handler for heart failure""" |
|
114 | 113 | self.log.debug("heartbeat::new heart failure handler: %s", handler) |
|
115 | 114 | self._failure_handlers.add(handler) |
|
116 | 115 | |
|
117 | 116 | def beat(self): |
|
118 | 117 | self.pongstream.flush() |
|
119 | 118 | self.last_ping = self.lifetime |
|
120 | 119 | |
|
121 | 120 | toc = time.time() |
|
122 | 121 | self.lifetime += toc-self.tic |
|
123 | 122 | self.tic = toc |
|
124 | self.log.debug("heartbeat::sending %s", self.lifetime) | |
|
123 | if self.debug: | |
|
124 | self.log.debug("heartbeat::sending %s", self.lifetime) | |
|
125 | 125 | goodhearts = self.hearts.intersection(self.responses) |
|
126 | 126 | missed_beats = self.hearts.difference(goodhearts) |
|
127 | 127 | newhearts = self.responses.difference(goodhearts) |
|
128 | 128 | for heart in newhearts: |
|
129 | 129 | self.handle_new_heart(heart) |
|
130 | 130 | heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation, |
|
131 | 131 | self.hearts) |
|
132 | 132 | for failure in heartfailures: |
|
133 | 133 | self.handle_heart_failure(failure) |
|
134 | 134 | self.on_probation = on_probation |
|
135 | 135 | self.responses = set() |
|
136 | 136 | #print self.on_probation, self.hearts |
|
137 | 137 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) |
|
138 | 138 | self.pingstream.send(str_to_bytes(str(self.lifetime))) |
|
139 | 139 | # flush stream to force immediate socket send |
|
140 | 140 | self.pingstream.flush() |
|
141 | 141 | |
|
142 | 142 | def _check_missed(self, missed_beats, on_probation, hearts): |
|
143 | 143 | """Update heartbeats on probation, identifying any that have too many misses. |
|
144 | 144 | """ |
|
145 | 145 | failures = [] |
|
146 | 146 | new_probation = {} |
|
147 | 147 | for cur_heart in (b for b in missed_beats if b in hearts): |
|
148 | 148 | miss_count = on_probation.get(cur_heart, 0) + 1 |
|
149 | 149 | self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count)) |
|
150 | 150 | if miss_count > self.max_heartmonitor_misses: |
|
151 | 151 | failures.append(cur_heart) |
|
152 | 152 | else: |
|
153 | 153 | new_probation[cur_heart] = miss_count |
|
154 | 154 | return failures, new_probation |
|
155 | 155 | |
|
156 | 156 | def handle_new_heart(self, heart): |
|
157 | 157 | if self._new_handlers: |
|
158 | 158 | for handler in self._new_handlers: |
|
159 | 159 | handler(heart) |
|
160 | 160 | else: |
|
161 | 161 | self.log.info("heartbeat::yay, got new heart %s!", heart) |
|
162 | 162 | self.hearts.add(heart) |
|
163 | 163 | |
|
164 | 164 | def handle_heart_failure(self, heart): |
|
165 | 165 | if self._failure_handlers: |
|
166 | 166 | for handler in self._failure_handlers: |
|
167 | 167 | try: |
|
168 | 168 | handler(heart) |
|
169 | 169 | except Exception as e: |
|
170 | 170 | self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True) |
|
171 | 171 | pass |
|
172 | 172 | else: |
|
173 | 173 | self.log.info("heartbeat::Heart %s failed :(", heart) |
|
174 | 174 | self.hearts.remove(heart) |
|
175 | 175 | |
|
176 | 176 | |
|
177 | 177 | @log_errors |
|
178 | 178 | def handle_pong(self, msg): |
|
179 | 179 | "a heart just beat" |
|
180 | 180 | current = str_to_bytes(str(self.lifetime)) |
|
181 | 181 | last = str_to_bytes(str(self.last_ping)) |
|
182 | 182 | if msg[1] == current: |
|
183 | 183 | delta = time.time()-self.tic |
|
184 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | |
|
184 | if self.debug: | |
|
185 | self.log.debug("heartbeat::heart %r took %.2f ms to respond", msg[0], 1000*delta) | |
|
185 | 186 | self.responses.add(msg[0]) |
|
186 | 187 | elif msg[1] == last: |
|
187 | 188 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) |
|
188 | 189 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta) |
|
189 | 190 | self.responses.add(msg[0]) |
|
190 | 191 | else: |
|
191 | 192 | self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime) |
|
192 | 193 |
General Comments 0
You need to be logged in to leave comments.
Login now