Show More
@@ -1,114 +1,111 b'' | |||||
1 | """A basic kernel monitor with autorestarting. |
|
1 | """A basic kernel monitor with autorestarting. | |
2 |
|
2 | |||
3 | This watches a kernel's state using KernelManager.is_alive and auto |
|
3 | This watches a kernel's state using KernelManager.is_alive and auto | |
4 | restarts the kernel if it dies. |
|
4 | restarts the kernel if it dies. | |
5 |
|
5 | |||
6 | It is an incomplete base class, and must be subclassed. |
|
6 | It is an incomplete base class, and must be subclassed. | |
7 | """ |
|
7 | """ | |
8 |
|
8 | |||
9 | #----------------------------------------------------------------------------- |
|
9 | # Copyright (c) IPython Development Team. | |
10 | # Copyright (C) 2013 The IPython Development Team |
|
10 | # Distributed under the terms of the Modified BSD License. | |
11 | # |
|
|||
12 | # Distributed under the terms of the BSD License. The full license is in |
|
|||
13 | # the file COPYING, distributed as part of this software. |
|
|||
14 | #----------------------------------------------------------------------------- |
|
|||
15 |
|
||||
16 | #----------------------------------------------------------------------------- |
|
|||
17 | # Imports |
|
|||
18 | #----------------------------------------------------------------------------- |
|
|||
19 |
|
11 | |||
20 | from IPython.config.configurable import LoggingConfigurable |
|
12 | from IPython.config.configurable import LoggingConfigurable | |
21 | from IPython.utils.traitlets import ( |
|
13 | from IPython.utils.traitlets import ( | |
22 | Instance, Float, Dict, Bool, Integer, |
|
14 | Instance, Float, Dict, Bool, Integer, | |
23 | ) |
|
15 | ) | |
24 |
|
16 | |||
25 | #----------------------------------------------------------------------------- |
|
|||
26 | # Code |
|
|||
27 | #----------------------------------------------------------------------------- |
|
|||
28 |
|
17 | |||
29 | class KernelRestarter(LoggingConfigurable): |
|
18 | class KernelRestarter(LoggingConfigurable): | |
30 | """Monitor and autorestart a kernel.""" |
|
19 | """Monitor and autorestart a kernel.""" | |
31 |
|
20 | |||
32 | kernel_manager = Instance('IPython.kernel.KernelManager') |
|
21 | kernel_manager = Instance('IPython.kernel.KernelManager') | |
33 |
|
22 | |||
|
23 | debug = Bool(False, config=True, | |||
|
24 | help="""Whether to include every poll event in debugging output. | |||
|
25 | ||||
|
26 | Has to be set explicitly, because there will be *a lot* of output. | |||
|
27 | """ | |||
|
28 | ) | |||
|
29 | ||||
34 | time_to_dead = Float(3.0, config=True, |
|
30 | time_to_dead = Float(3.0, config=True, | |
35 | help="""Kernel heartbeat interval in seconds.""" |
|
31 | help="""Kernel heartbeat interval in seconds.""" | |
36 | ) |
|
32 | ) | |
37 |
|
33 | |||
38 | restart_limit = Integer(5, config=True, |
|
34 | restart_limit = Integer(5, config=True, | |
39 | help="""The number of consecutive autorestarts before the kernel is presumed dead.""" |
|
35 | help="""The number of consecutive autorestarts before the kernel is presumed dead.""" | |
40 | ) |
|
36 | ) | |
41 | _restarting = Bool(False) |
|
37 | _restarting = Bool(False) | |
42 | _restart_count = Integer(0) |
|
38 | _restart_count = Integer(0) | |
43 |
|
39 | |||
44 | callbacks = Dict() |
|
40 | callbacks = Dict() | |
45 | def _callbacks_default(self): |
|
41 | def _callbacks_default(self): | |
46 | return dict(restart=[], dead=[]) |
|
42 | return dict(restart=[], dead=[]) | |
47 |
|
43 | |||
48 | def start(self): |
|
44 | def start(self): | |
49 | """Start the polling of the kernel.""" |
|
45 | """Start the polling of the kernel.""" | |
50 | raise NotImplementedError("Must be implemented in a subclass") |
|
46 | raise NotImplementedError("Must be implemented in a subclass") | |
51 |
|
47 | |||
52 | def stop(self): |
|
48 | def stop(self): | |
53 | """Stop the kernel polling.""" |
|
49 | """Stop the kernel polling.""" | |
54 | raise NotImplementedError("Must be implemented in a subclass") |
|
50 | raise NotImplementedError("Must be implemented in a subclass") | |
55 |
|
51 | |||
56 | def add_callback(self, f, event='restart'): |
|
52 | def add_callback(self, f, event='restart'): | |
57 | """register a callback to fire on a particular event |
|
53 | """register a callback to fire on a particular event | |
58 |
|
54 | |||
59 | Possible values for event: |
|
55 | Possible values for event: | |
60 |
|
56 | |||
61 | 'restart' (default): kernel has died, and will be restarted. |
|
57 | 'restart' (default): kernel has died, and will be restarted. | |
62 | 'dead': restart has failed, kernel will be left dead. |
|
58 | 'dead': restart has failed, kernel will be left dead. | |
63 |
|
59 | |||
64 | """ |
|
60 | """ | |
65 | self.callbacks[event].append(f) |
|
61 | self.callbacks[event].append(f) | |
66 |
|
62 | |||
67 | def remove_callback(self, f, event='restart'): |
|
63 | def remove_callback(self, f, event='restart'): | |
68 | """unregister a callback to fire on a particular event |
|
64 | """unregister a callback to fire on a particular event | |
69 |
|
65 | |||
70 | Possible values for event: |
|
66 | Possible values for event: | |
71 |
|
67 | |||
72 | 'restart' (default): kernel has died, and will be restarted. |
|
68 | 'restart' (default): kernel has died, and will be restarted. | |
73 | 'dead': restart has failed, kernel will be left dead. |
|
69 | 'dead': restart has failed, kernel will be left dead. | |
74 |
|
70 | |||
75 | """ |
|
71 | """ | |
76 | try: |
|
72 | try: | |
77 | self.callbacks[event].remove(f) |
|
73 | self.callbacks[event].remove(f) | |
78 | except ValueError: |
|
74 | except ValueError: | |
79 | pass |
|
75 | pass | |
80 |
|
76 | |||
81 | def _fire_callbacks(self, event): |
|
77 | def _fire_callbacks(self, event): | |
82 | """fire our callbacks for a particular event""" |
|
78 | """fire our callbacks for a particular event""" | |
83 | for callback in self.callbacks[event]: |
|
79 | for callback in self.callbacks[event]: | |
84 | try: |
|
80 | try: | |
85 | callback() |
|
81 | callback() | |
86 | except Exception as e: |
|
82 | except Exception as e: | |
87 | self.log.error("KernelRestarter: %s callback %r failed", event, callback, exc_info=True) |
|
83 | self.log.error("KernelRestarter: %s callback %r failed", event, callback, exc_info=True) | |
88 |
|
84 | |||
89 | def poll(self): |
|
85 | def poll(self): | |
|
86 | if self.debug: | |||
90 | self.log.debug('Polling kernel...') |
|
87 | self.log.debug('Polling kernel...') | |
91 | if not self.kernel_manager.is_alive(): |
|
88 | if not self.kernel_manager.is_alive(): | |
92 | if self._restarting: |
|
89 | if self._restarting: | |
93 | self._restart_count += 1 |
|
90 | self._restart_count += 1 | |
94 | else: |
|
91 | else: | |
95 | self._restart_count = 1 |
|
92 | self._restart_count = 1 | |
96 |
|
93 | |||
97 | if self._restart_count >= self.restart_limit: |
|
94 | if self._restart_count >= self.restart_limit: | |
98 | self.log.warn("KernelRestarter: restart failed") |
|
95 | self.log.warn("KernelRestarter: restart failed") | |
99 | self._fire_callbacks('dead') |
|
96 | self._fire_callbacks('dead') | |
100 | self._restarting = False |
|
97 | self._restarting = False | |
101 | self._restart_count = 0 |
|
98 | self._restart_count = 0 | |
102 | self.stop() |
|
99 | self.stop() | |
103 | else: |
|
100 | else: | |
104 | self.log.info('KernelRestarter: restarting kernel (%i/%i)', |
|
101 | self.log.info('KernelRestarter: restarting kernel (%i/%i)', | |
105 | self._restart_count, |
|
102 | self._restart_count, | |
106 | self.restart_limit |
|
103 | self.restart_limit | |
107 | ) |
|
104 | ) | |
108 | self._fire_callbacks('restart') |
|
105 | self._fire_callbacks('restart') | |
109 | self.kernel_manager.restart_kernel(now=True) |
|
106 | self.kernel_manager.restart_kernel(now=True) | |
110 | self._restarting = True |
|
107 | self._restarting = True | |
111 | else: |
|
108 | else: | |
112 | if self._restarting: |
|
109 | if self._restarting: | |
113 | self.log.debug("KernelRestarter: restart apparently succeeded") |
|
110 | self.log.debug("KernelRestarter: restart apparently succeeded") | |
114 | self._restarting = False |
|
111 | self._restarting = False |
@@ -1,192 +1,193 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """ |
|
2 | """ | |
3 | A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB, |
|
3 | A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB, | |
4 | and hearts are tracked based on their DEALER identities. |
|
4 | and hearts are tracked based on their DEALER identities. | |
5 |
|
||||
6 | Authors: |
|
|||
7 |
|
||||
8 | * Min RK |
|
|||
9 | """ |
|
5 | """ | |
10 | #----------------------------------------------------------------------------- |
|
6 | ||
11 |
# |
|
7 | # Copyright (c) IPython Development Team. | |
12 | # |
|
8 | # Distributed under the terms of the Modified BSD License. | |
13 | # Distributed under the terms of the BSD License. The full license is in |
|
|||
14 | # the file COPYING, distributed as part of this software. |
|
|||
15 | #----------------------------------------------------------------------------- |
|
|||
16 |
|
9 | |||
17 | from __future__ import print_function |
|
10 | from __future__ import print_function | |
18 | import time |
|
11 | import time | |
19 | import uuid |
|
12 | import uuid | |
20 |
|
13 | |||
21 | import zmq |
|
14 | import zmq | |
22 | from zmq.devices import ThreadDevice, ThreadMonitoredQueue |
|
15 | from zmq.devices import ThreadDevice, ThreadMonitoredQueue | |
23 | from zmq.eventloop import ioloop, zmqstream |
|
16 | from zmq.eventloop import ioloop, zmqstream | |
24 |
|
17 | |||
25 | from IPython.config.configurable import LoggingConfigurable |
|
18 | from IPython.config.configurable import LoggingConfigurable | |
26 | from IPython.utils.py3compat import str_to_bytes |
|
19 | from IPython.utils.py3compat import str_to_bytes | |
27 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict |
|
20 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict, Bool | |
28 |
|
21 | |||
29 | from IPython.parallel.util import log_errors |
|
22 | from IPython.parallel.util import log_errors | |
30 |
|
23 | |||
31 | class Heart(object): |
|
24 | class Heart(object): | |
32 | """A basic heart object for responding to a HeartMonitor. |
|
25 | """A basic heart object for responding to a HeartMonitor. | |
33 | This is a simple wrapper with defaults for the most common |
|
26 | This is a simple wrapper with defaults for the most common | |
34 | Device model for responding to heartbeats. |
|
27 | Device model for responding to heartbeats. | |
35 |
|
28 | |||
36 | It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using |
|
29 | It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using | |
37 | SUB/DEALER for in/out. |
|
30 | SUB/DEALER for in/out. | |
38 |
|
31 | |||
39 | You can specify the DEALER's IDENTITY via the optional heart_id argument.""" |
|
32 | You can specify the DEALER's IDENTITY via the optional heart_id argument.""" | |
40 | device=None |
|
33 | device=None | |
41 | id=None |
|
34 | id=None | |
42 | def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None): |
|
35 | def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None): | |
43 | if mon_addr is None: |
|
36 | if mon_addr is None: | |
44 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) |
|
37 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) | |
45 | else: |
|
38 | else: | |
46 | self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"") |
|
39 | self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"") | |
47 | # do not allow the device to share global Context.instance, |
|
40 | # do not allow the device to share global Context.instance, | |
48 | # which is the default behavior in pyzmq > 2.1.10 |
|
41 | # which is the default behavior in pyzmq > 2.1.10 | |
49 | self.device.context_factory = zmq.Context |
|
42 | self.device.context_factory = zmq.Context | |
50 |
|
43 | |||
51 | self.device.daemon=True |
|
44 | self.device.daemon=True | |
52 | self.device.connect_in(in_addr) |
|
45 | self.device.connect_in(in_addr) | |
53 | self.device.connect_out(out_addr) |
|
46 | self.device.connect_out(out_addr) | |
54 | if mon_addr is not None: |
|
47 | if mon_addr is not None: | |
55 | self.device.connect_mon(mon_addr) |
|
48 | self.device.connect_mon(mon_addr) | |
56 | if in_type == zmq.SUB: |
|
49 | if in_type == zmq.SUB: | |
57 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") |
|
50 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") | |
58 | if heart_id is None: |
|
51 | if heart_id is None: | |
59 | heart_id = uuid.uuid4().bytes |
|
52 | heart_id = uuid.uuid4().bytes | |
60 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) |
|
53 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) | |
61 | self.id = heart_id |
|
54 | self.id = heart_id | |
62 |
|
55 | |||
63 | def start(self): |
|
56 | def start(self): | |
64 | return self.device.start() |
|
57 | return self.device.start() | |
65 |
|
58 | |||
66 |
|
59 | |||
67 | class HeartMonitor(LoggingConfigurable): |
|
60 | class HeartMonitor(LoggingConfigurable): | |
68 | """A basic HeartMonitor class |
|
61 | """A basic HeartMonitor class | |
69 | pingstream: a PUB stream |
|
62 | pingstream: a PUB stream | |
70 | pongstream: an ROUTER stream |
|
63 | pongstream: an ROUTER stream | |
71 | period: the period of the heartbeat in milliseconds""" |
|
64 | period: the period of the heartbeat in milliseconds""" | |
72 |
|
65 | |||
|
66 | debug = Bool(False, config=True, | |||
|
67 | help="""Whether to include every heartbeat in debugging output. | |||
|
68 | ||||
|
69 | Has to be set explicitly, because there will be *a lot* of output. | |||
|
70 | """ | |||
|
71 | ) | |||
73 | period = Integer(3000, config=True, |
|
72 | period = Integer(3000, config=True, | |
74 | help='The frequency at which the Hub pings the engines for heartbeats ' |
|
73 | help='The frequency at which the Hub pings the engines for heartbeats ' | |
75 | '(in ms)', |
|
74 | '(in ms)', | |
76 | ) |
|
75 | ) | |
77 | max_heartmonitor_misses = Integer(10, config=True, |
|
76 | max_heartmonitor_misses = Integer(10, config=True, | |
78 | help='Allowed consecutive missed pings from controller Hub to engine before unregistering.', |
|
77 | help='Allowed consecutive missed pings from controller Hub to engine before unregistering.', | |
79 | ) |
|
78 | ) | |
80 |
|
79 | |||
81 | pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
80 | pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | |
82 | pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
81 | pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | |
83 | loop = Instance('zmq.eventloop.ioloop.IOLoop') |
|
82 | loop = Instance('zmq.eventloop.ioloop.IOLoop') | |
84 | def _loop_default(self): |
|
83 | def _loop_default(self): | |
85 | return ioloop.IOLoop.instance() |
|
84 | return ioloop.IOLoop.instance() | |
86 |
|
85 | |||
87 | # not settable: |
|
86 | # not settable: | |
88 | hearts=Set() |
|
87 | hearts=Set() | |
89 | responses=Set() |
|
88 | responses=Set() | |
90 | on_probation=Dict() |
|
89 | on_probation=Dict() | |
91 | last_ping=CFloat(0) |
|
90 | last_ping=CFloat(0) | |
92 | _new_handlers = Set() |
|
91 | _new_handlers = Set() | |
93 | _failure_handlers = Set() |
|
92 | _failure_handlers = Set() | |
94 | lifetime = CFloat(0) |
|
93 | lifetime = CFloat(0) | |
95 | tic = CFloat(0) |
|
94 | tic = CFloat(0) | |
96 |
|
95 | |||
97 | def __init__(self, **kwargs): |
|
96 | def __init__(self, **kwargs): | |
98 | super(HeartMonitor, self).__init__(**kwargs) |
|
97 | super(HeartMonitor, self).__init__(**kwargs) | |
99 |
|
98 | |||
100 | self.pongstream.on_recv(self.handle_pong) |
|
99 | self.pongstream.on_recv(self.handle_pong) | |
101 |
|
100 | |||
102 | def start(self): |
|
101 | def start(self): | |
103 | self.tic = time.time() |
|
102 | self.tic = time.time() | |
104 | self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) |
|
103 | self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) | |
105 | self.caller.start() |
|
104 | self.caller.start() | |
106 |
|
105 | |||
107 | def add_new_heart_handler(self, handler): |
|
106 | def add_new_heart_handler(self, handler): | |
108 | """add a new handler for new hearts""" |
|
107 | """add a new handler for new hearts""" | |
109 | self.log.debug("heartbeat::new_heart_handler: %s", handler) |
|
108 | self.log.debug("heartbeat::new_heart_handler: %s", handler) | |
110 | self._new_handlers.add(handler) |
|
109 | self._new_handlers.add(handler) | |
111 |
|
110 | |||
112 | def add_heart_failure_handler(self, handler): |
|
111 | def add_heart_failure_handler(self, handler): | |
113 | """add a new handler for heart failure""" |
|
112 | """add a new handler for heart failure""" | |
114 | self.log.debug("heartbeat::new heart failure handler: %s", handler) |
|
113 | self.log.debug("heartbeat::new heart failure handler: %s", handler) | |
115 | self._failure_handlers.add(handler) |
|
114 | self._failure_handlers.add(handler) | |
116 |
|
115 | |||
117 | def beat(self): |
|
116 | def beat(self): | |
118 | self.pongstream.flush() |
|
117 | self.pongstream.flush() | |
119 | self.last_ping = self.lifetime |
|
118 | self.last_ping = self.lifetime | |
120 |
|
119 | |||
121 | toc = time.time() |
|
120 | toc = time.time() | |
122 | self.lifetime += toc-self.tic |
|
121 | self.lifetime += toc-self.tic | |
123 | self.tic = toc |
|
122 | self.tic = toc | |
|
123 | if self.debug: | |||
124 | self.log.debug("heartbeat::sending %s", self.lifetime) |
|
124 | self.log.debug("heartbeat::sending %s", self.lifetime) | |
125 | goodhearts = self.hearts.intersection(self.responses) |
|
125 | goodhearts = self.hearts.intersection(self.responses) | |
126 | missed_beats = self.hearts.difference(goodhearts) |
|
126 | missed_beats = self.hearts.difference(goodhearts) | |
127 | newhearts = self.responses.difference(goodhearts) |
|
127 | newhearts = self.responses.difference(goodhearts) | |
128 | for heart in newhearts: |
|
128 | for heart in newhearts: | |
129 | self.handle_new_heart(heart) |
|
129 | self.handle_new_heart(heart) | |
130 | heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation, |
|
130 | heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation, | |
131 | self.hearts) |
|
131 | self.hearts) | |
132 | for failure in heartfailures: |
|
132 | for failure in heartfailures: | |
133 | self.handle_heart_failure(failure) |
|
133 | self.handle_heart_failure(failure) | |
134 | self.on_probation = on_probation |
|
134 | self.on_probation = on_probation | |
135 | self.responses = set() |
|
135 | self.responses = set() | |
136 | #print self.on_probation, self.hearts |
|
136 | #print self.on_probation, self.hearts | |
137 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) |
|
137 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) | |
138 | self.pingstream.send(str_to_bytes(str(self.lifetime))) |
|
138 | self.pingstream.send(str_to_bytes(str(self.lifetime))) | |
139 | # flush stream to force immediate socket send |
|
139 | # flush stream to force immediate socket send | |
140 | self.pingstream.flush() |
|
140 | self.pingstream.flush() | |
141 |
|
141 | |||
142 | def _check_missed(self, missed_beats, on_probation, hearts): |
|
142 | def _check_missed(self, missed_beats, on_probation, hearts): | |
143 | """Update heartbeats on probation, identifying any that have too many misses. |
|
143 | """Update heartbeats on probation, identifying any that have too many misses. | |
144 | """ |
|
144 | """ | |
145 | failures = [] |
|
145 | failures = [] | |
146 | new_probation = {} |
|
146 | new_probation = {} | |
147 | for cur_heart in (b for b in missed_beats if b in hearts): |
|
147 | for cur_heart in (b for b in missed_beats if b in hearts): | |
148 | miss_count = on_probation.get(cur_heart, 0) + 1 |
|
148 | miss_count = on_probation.get(cur_heart, 0) + 1 | |
149 | self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count)) |
|
149 | self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count)) | |
150 | if miss_count > self.max_heartmonitor_misses: |
|
150 | if miss_count > self.max_heartmonitor_misses: | |
151 | failures.append(cur_heart) |
|
151 | failures.append(cur_heart) | |
152 | else: |
|
152 | else: | |
153 | new_probation[cur_heart] = miss_count |
|
153 | new_probation[cur_heart] = miss_count | |
154 | return failures, new_probation |
|
154 | return failures, new_probation | |
155 |
|
155 | |||
156 | def handle_new_heart(self, heart): |
|
156 | def handle_new_heart(self, heart): | |
157 | if self._new_handlers: |
|
157 | if self._new_handlers: | |
158 | for handler in self._new_handlers: |
|
158 | for handler in self._new_handlers: | |
159 | handler(heart) |
|
159 | handler(heart) | |
160 | else: |
|
160 | else: | |
161 | self.log.info("heartbeat::yay, got new heart %s!", heart) |
|
161 | self.log.info("heartbeat::yay, got new heart %s!", heart) | |
162 | self.hearts.add(heart) |
|
162 | self.hearts.add(heart) | |
163 |
|
163 | |||
164 | def handle_heart_failure(self, heart): |
|
164 | def handle_heart_failure(self, heart): | |
165 | if self._failure_handlers: |
|
165 | if self._failure_handlers: | |
166 | for handler in self._failure_handlers: |
|
166 | for handler in self._failure_handlers: | |
167 | try: |
|
167 | try: | |
168 | handler(heart) |
|
168 | handler(heart) | |
169 | except Exception as e: |
|
169 | except Exception as e: | |
170 | self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True) |
|
170 | self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True) | |
171 | pass |
|
171 | pass | |
172 | else: |
|
172 | else: | |
173 | self.log.info("heartbeat::Heart %s failed :(", heart) |
|
173 | self.log.info("heartbeat::Heart %s failed :(", heart) | |
174 | self.hearts.remove(heart) |
|
174 | self.hearts.remove(heart) | |
175 |
|
175 | |||
176 |
|
176 | |||
177 | @log_errors |
|
177 | @log_errors | |
178 | def handle_pong(self, msg): |
|
178 | def handle_pong(self, msg): | |
179 | "a heart just beat" |
|
179 | "a heart just beat" | |
180 | current = str_to_bytes(str(self.lifetime)) |
|
180 | current = str_to_bytes(str(self.lifetime)) | |
181 | last = str_to_bytes(str(self.last_ping)) |
|
181 | last = str_to_bytes(str(self.last_ping)) | |
182 | if msg[1] == current: |
|
182 | if msg[1] == current: | |
183 | delta = time.time()-self.tic |
|
183 | delta = time.time()-self.tic | |
184 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
|
184 | if self.debug: | |
|
185 | self.log.debug("heartbeat::heart %r took %.2f ms to respond", msg[0], 1000*delta) | |||
185 | self.responses.add(msg[0]) |
|
186 | self.responses.add(msg[0]) | |
186 | elif msg[1] == last: |
|
187 | elif msg[1] == last: | |
187 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) |
|
188 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) | |
188 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta) |
|
189 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta) | |
189 | self.responses.add(msg[0]) |
|
190 | self.responses.add(msg[0]) | |
190 | else: |
|
191 | else: | |
191 | self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime) |
|
192 | self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime) | |
192 |
|
193 |
General Comments 0
You need to be logged in to leave comments.
Login now