##// END OF EJS Templates
Add default (None) values to HBChannel constructor...
Carlos Cordoba -
Show More
@@ -1,203 +1,203 b''
1 1 """Base classes to manage a Client's interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 import atexit
9 9 import errno
10 10 from threading import Thread
11 11 import time
12 12
13 13 import zmq
14 14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 15 # during garbage collection of threads at exit:
16 16 from zmq import ZMQError
17 17
18 18 from IPython.core.release import kernel_protocol_version_info
19 19
20 20 from .channelsabc import HBChannelABC
21 21
22 22 #-----------------------------------------------------------------------------
23 23 # Constants and exceptions
24 24 #-----------------------------------------------------------------------------
25 25
26 26 major_protocol_version = kernel_protocol_version_info[0]
27 27
28 28 class InvalidPortNumber(Exception):
29 29 pass
30 30
31 31 class HBChannel(Thread):
32 32 """The heartbeat channel which monitors the kernel heartbeat.
33 33
34 34 Note that the heartbeat channel is paused by default. As long as you start
35 35 this channel, the kernel manager will ensure that it is paused and un-paused
36 36 as appropriate.
37 37 """
38 38 context = None
39 39 session = None
40 40 socket = None
41 41 address = None
42 42 _exiting = False
43 43
44 44 time_to_dead = 1.
45 45 poller = None
46 46 _running = None
47 47 _pause = None
48 48 _beating = None
49 49
50 def __init__(self, context, session, address):
50 def __init__(self, context=None, session=None, address=None):
51 51 """Create the heartbeat monitor thread.
52 52
53 53 Parameters
54 54 ----------
55 55 context : :class:`zmq.Context`
56 56 The ZMQ context to use.
57 57 session : :class:`session.Session`
58 58 The session to use.
59 59 address : zmq url
60 60 Standard (ip, port) tuple that the kernel is listening on.
61 61 """
62 62 super(HBChannel, self).__init__()
63 63 self.daemon = True
64 64
65 65 self.context = context
66 66 self.session = session
67 67 if isinstance(address, tuple):
68 68 if address[1] == 0:
69 69 message = 'The port number for a channel cannot be 0.'
70 70 raise InvalidPortNumber(message)
71 71 address = "tcp://%s:%i" % address
72 72 self.address = address
73 73 atexit.register(self._notice_exit)
74 74
75 75 self._running = False
76 76 self._pause = True
77 77 self.poller = zmq.Poller()
78 78
79 79 def _notice_exit(self):
80 80 self._exiting = True
81 81
82 82 def _create_socket(self):
83 83 if self.socket is not None:
84 84 # close previous socket, before opening a new one
85 85 self.poller.unregister(self.socket)
86 86 self.socket.close()
87 87 self.socket = self.context.socket(zmq.REQ)
88 88 self.socket.linger = 1000
89 89 self.socket.connect(self.address)
90 90
91 91 self.poller.register(self.socket, zmq.POLLIN)
92 92
93 93 def _poll(self, start_time):
94 94 """poll for heartbeat replies until we reach self.time_to_dead.
95 95
96 96 Ignores interrupts, and returns the result of poll(), which
97 97 will be an empty list if no messages arrived before the timeout,
98 98 or the event tuple if there is a message to receive.
99 99 """
100 100
101 101 until_dead = self.time_to_dead - (time.time() - start_time)
102 102 # ensure poll at least once
103 103 until_dead = max(until_dead, 1e-3)
104 104 events = []
105 105 while True:
106 106 try:
107 107 events = self.poller.poll(1000 * until_dead)
108 108 except ZMQError as e:
109 109 if e.errno == errno.EINTR:
110 110 # ignore interrupts during heartbeat
111 111 # this may never actually happen
112 112 until_dead = self.time_to_dead - (time.time() - start_time)
113 113 until_dead = max(until_dead, 1e-3)
114 114 pass
115 115 else:
116 116 raise
117 117 except Exception:
118 118 if self._exiting:
119 119 break
120 120 else:
121 121 raise
122 122 else:
123 123 break
124 124 return events
125 125
126 126 def run(self):
127 127 """The thread's main activity. Call start() instead."""
128 128 self._create_socket()
129 129 self._running = True
130 130 self._beating = True
131 131
132 132 while self._running:
133 133 if self._pause:
134 134 # just sleep, and skip the rest of the loop
135 135 time.sleep(self.time_to_dead)
136 136 continue
137 137
138 138 since_last_heartbeat = 0.0
139 139 # io.rprint('Ping from HB channel') # dbg
140 140 # no need to catch EFSM here, because the previous event was
141 141 # either a recv or connect, which cannot be followed by EFSM
142 142 self.socket.send(b'ping')
143 143 request_time = time.time()
144 144 ready = self._poll(request_time)
145 145 if ready:
146 146 self._beating = True
147 147 # the poll above guarantees we have something to recv
148 148 self.socket.recv()
149 149 # sleep the remainder of the cycle
150 150 remainder = self.time_to_dead - (time.time() - request_time)
151 151 if remainder > 0:
152 152 time.sleep(remainder)
153 153 continue
154 154 else:
155 155 # nothing was received within the time limit, signal heart failure
156 156 self._beating = False
157 157 since_last_heartbeat = time.time() - request_time
158 158 self.call_handlers(since_last_heartbeat)
159 159 # and close/reopen the socket, because the REQ/REP cycle has been broken
160 160 self._create_socket()
161 161 continue
162 162
163 163 def pause(self):
164 164 """Pause the heartbeat."""
165 165 self._pause = True
166 166
167 167 def unpause(self):
168 168 """Unpause the heartbeat."""
169 169 self._pause = False
170 170
171 171 def is_beating(self):
172 172 """Is the heartbeat running and responsive (and not paused)."""
173 173 if self.is_alive() and not self._pause and self._beating:
174 174 return True
175 175 else:
176 176 return False
177 177
178 178 def stop(self):
179 179 """Stop the channel's event loop and join its thread."""
180 180 self._running = False
181 181 self.join()
182 182 self.close()
183 183
184 184 def close(self):
185 185 if self.socket is not None:
186 186 try:
187 187 self.socket.close(linger=0)
188 188 except Exception:
189 189 pass
190 190 self.socket = None
191 191
192 192 def call_handlers(self, since_last_heartbeat):
193 193 """This method is called in the ioloop thread when a message arrives.
194 194
195 195 Subclasses should override this method to handle incoming messages.
196 196 It is important to remember that this method is called in the thread
197 197 so that some logic must be done to ensure that the application level
198 198 handlers are called in the application thread.
199 199 """
200 200 pass
201 201
202 202
203 203 HBChannelABC.register(HBChannel)
General Comments 0
You need to be logged in to leave comments. Login now