##// END OF EJS Templates
Add default (None) values to HBChannel constructor...
Carlos Cordoba -
Show More
@@ -1,203 +1,203 b''
1 """Base classes to manage a Client's interaction with a running kernel"""
1 """Base classes to manage a Client's interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 import atexit
8 import atexit
9 import errno
9 import errno
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12
12
13 import zmq
13 import zmq
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 # during garbage collection of threads at exit:
15 # during garbage collection of threads at exit:
16 from zmq import ZMQError
16 from zmq import ZMQError
17
17
18 from IPython.core.release import kernel_protocol_version_info
18 from IPython.core.release import kernel_protocol_version_info
19
19
20 from .channelsabc import HBChannelABC
20 from .channelsabc import HBChannelABC
21
21
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23 # Constants and exceptions
23 # Constants and exceptions
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25
25
26 major_protocol_version = kernel_protocol_version_info[0]
26 major_protocol_version = kernel_protocol_version_info[0]
27
27
28 class InvalidPortNumber(Exception):
28 class InvalidPortNumber(Exception):
29 pass
29 pass
30
30
31 class HBChannel(Thread):
31 class HBChannel(Thread):
32 """The heartbeat channel which monitors the kernel heartbeat.
32 """The heartbeat channel which monitors the kernel heartbeat.
33
33
34 Note that the heartbeat channel is paused by default. As long as you start
34 Note that the heartbeat channel is paused by default. As long as you start
35 this channel, the kernel manager will ensure that it is paused and un-paused
35 this channel, the kernel manager will ensure that it is paused and un-paused
36 as appropriate.
36 as appropriate.
37 """
37 """
38 context = None
38 context = None
39 session = None
39 session = None
40 socket = None
40 socket = None
41 address = None
41 address = None
42 _exiting = False
42 _exiting = False
43
43
44 time_to_dead = 1.
44 time_to_dead = 1.
45 poller = None
45 poller = None
46 _running = None
46 _running = None
47 _pause = None
47 _pause = None
48 _beating = None
48 _beating = None
49
49
50 def __init__(self, context, session, address):
50 def __init__(self, context=None, session=None, address=None):
51 """Create the heartbeat monitor thread.
51 """Create the heartbeat monitor thread.
52
52
53 Parameters
53 Parameters
54 ----------
54 ----------
55 context : :class:`zmq.Context`
55 context : :class:`zmq.Context`
56 The ZMQ context to use.
56 The ZMQ context to use.
57 session : :class:`session.Session`
57 session : :class:`session.Session`
58 The session to use.
58 The session to use.
59 address : zmq url
59 address : zmq url
60 Standard (ip, port) tuple that the kernel is listening on.
60 Standard (ip, port) tuple that the kernel is listening on.
61 """
61 """
62 super(HBChannel, self).__init__()
62 super(HBChannel, self).__init__()
63 self.daemon = True
63 self.daemon = True
64
64
65 self.context = context
65 self.context = context
66 self.session = session
66 self.session = session
67 if isinstance(address, tuple):
67 if isinstance(address, tuple):
68 if address[1] == 0:
68 if address[1] == 0:
69 message = 'The port number for a channel cannot be 0.'
69 message = 'The port number for a channel cannot be 0.'
70 raise InvalidPortNumber(message)
70 raise InvalidPortNumber(message)
71 address = "tcp://%s:%i" % address
71 address = "tcp://%s:%i" % address
72 self.address = address
72 self.address = address
73 atexit.register(self._notice_exit)
73 atexit.register(self._notice_exit)
74
74
75 self._running = False
75 self._running = False
76 self._pause = True
76 self._pause = True
77 self.poller = zmq.Poller()
77 self.poller = zmq.Poller()
78
78
79 def _notice_exit(self):
79 def _notice_exit(self):
80 self._exiting = True
80 self._exiting = True
81
81
82 def _create_socket(self):
82 def _create_socket(self):
83 if self.socket is not None:
83 if self.socket is not None:
84 # close previous socket, before opening a new one
84 # close previous socket, before opening a new one
85 self.poller.unregister(self.socket)
85 self.poller.unregister(self.socket)
86 self.socket.close()
86 self.socket.close()
87 self.socket = self.context.socket(zmq.REQ)
87 self.socket = self.context.socket(zmq.REQ)
88 self.socket.linger = 1000
88 self.socket.linger = 1000
89 self.socket.connect(self.address)
89 self.socket.connect(self.address)
90
90
91 self.poller.register(self.socket, zmq.POLLIN)
91 self.poller.register(self.socket, zmq.POLLIN)
92
92
93 def _poll(self, start_time):
93 def _poll(self, start_time):
94 """poll for heartbeat replies until we reach self.time_to_dead.
94 """poll for heartbeat replies until we reach self.time_to_dead.
95
95
96 Ignores interrupts, and returns the result of poll(), which
96 Ignores interrupts, and returns the result of poll(), which
97 will be an empty list if no messages arrived before the timeout,
97 will be an empty list if no messages arrived before the timeout,
98 or the event tuple if there is a message to receive.
98 or the event tuple if there is a message to receive.
99 """
99 """
100
100
101 until_dead = self.time_to_dead - (time.time() - start_time)
101 until_dead = self.time_to_dead - (time.time() - start_time)
102 # ensure poll at least once
102 # ensure poll at least once
103 until_dead = max(until_dead, 1e-3)
103 until_dead = max(until_dead, 1e-3)
104 events = []
104 events = []
105 while True:
105 while True:
106 try:
106 try:
107 events = self.poller.poll(1000 * until_dead)
107 events = self.poller.poll(1000 * until_dead)
108 except ZMQError as e:
108 except ZMQError as e:
109 if e.errno == errno.EINTR:
109 if e.errno == errno.EINTR:
110 # ignore interrupts during heartbeat
110 # ignore interrupts during heartbeat
111 # this may never actually happen
111 # this may never actually happen
112 until_dead = self.time_to_dead - (time.time() - start_time)
112 until_dead = self.time_to_dead - (time.time() - start_time)
113 until_dead = max(until_dead, 1e-3)
113 until_dead = max(until_dead, 1e-3)
114 pass
114 pass
115 else:
115 else:
116 raise
116 raise
117 except Exception:
117 except Exception:
118 if self._exiting:
118 if self._exiting:
119 break
119 break
120 else:
120 else:
121 raise
121 raise
122 else:
122 else:
123 break
123 break
124 return events
124 return events
125
125
126 def run(self):
126 def run(self):
127 """The thread's main activity. Call start() instead."""
127 """The thread's main activity. Call start() instead."""
128 self._create_socket()
128 self._create_socket()
129 self._running = True
129 self._running = True
130 self._beating = True
130 self._beating = True
131
131
132 while self._running:
132 while self._running:
133 if self._pause:
133 if self._pause:
134 # just sleep, and skip the rest of the loop
134 # just sleep, and skip the rest of the loop
135 time.sleep(self.time_to_dead)
135 time.sleep(self.time_to_dead)
136 continue
136 continue
137
137
138 since_last_heartbeat = 0.0
138 since_last_heartbeat = 0.0
139 # io.rprint('Ping from HB channel') # dbg
139 # io.rprint('Ping from HB channel') # dbg
140 # no need to catch EFSM here, because the previous event was
140 # no need to catch EFSM here, because the previous event was
141 # either a recv or connect, which cannot be followed by EFSM
141 # either a recv or connect, which cannot be followed by EFSM
142 self.socket.send(b'ping')
142 self.socket.send(b'ping')
143 request_time = time.time()
143 request_time = time.time()
144 ready = self._poll(request_time)
144 ready = self._poll(request_time)
145 if ready:
145 if ready:
146 self._beating = True
146 self._beating = True
147 # the poll above guarantees we have something to recv
147 # the poll above guarantees we have something to recv
148 self.socket.recv()
148 self.socket.recv()
149 # sleep the remainder of the cycle
149 # sleep the remainder of the cycle
150 remainder = self.time_to_dead - (time.time() - request_time)
150 remainder = self.time_to_dead - (time.time() - request_time)
151 if remainder > 0:
151 if remainder > 0:
152 time.sleep(remainder)
152 time.sleep(remainder)
153 continue
153 continue
154 else:
154 else:
155 # nothing was received within the time limit, signal heart failure
155 # nothing was received within the time limit, signal heart failure
156 self._beating = False
156 self._beating = False
157 since_last_heartbeat = time.time() - request_time
157 since_last_heartbeat = time.time() - request_time
158 self.call_handlers(since_last_heartbeat)
158 self.call_handlers(since_last_heartbeat)
159 # and close/reopen the socket, because the REQ/REP cycle has been broken
159 # and close/reopen the socket, because the REQ/REP cycle has been broken
160 self._create_socket()
160 self._create_socket()
161 continue
161 continue
162
162
163 def pause(self):
163 def pause(self):
164 """Pause the heartbeat."""
164 """Pause the heartbeat."""
165 self._pause = True
165 self._pause = True
166
166
167 def unpause(self):
167 def unpause(self):
168 """Unpause the heartbeat."""
168 """Unpause the heartbeat."""
169 self._pause = False
169 self._pause = False
170
170
171 def is_beating(self):
171 def is_beating(self):
172 """Is the heartbeat running and responsive (and not paused)."""
172 """Is the heartbeat running and responsive (and not paused)."""
173 if self.is_alive() and not self._pause and self._beating:
173 if self.is_alive() and not self._pause and self._beating:
174 return True
174 return True
175 else:
175 else:
176 return False
176 return False
177
177
178 def stop(self):
178 def stop(self):
179 """Stop the channel's event loop and join its thread."""
179 """Stop the channel's event loop and join its thread."""
180 self._running = False
180 self._running = False
181 self.join()
181 self.join()
182 self.close()
182 self.close()
183
183
184 def close(self):
184 def close(self):
185 if self.socket is not None:
185 if self.socket is not None:
186 try:
186 try:
187 self.socket.close(linger=0)
187 self.socket.close(linger=0)
188 except Exception:
188 except Exception:
189 pass
189 pass
190 self.socket = None
190 self.socket = None
191
191
192 def call_handlers(self, since_last_heartbeat):
192 def call_handlers(self, since_last_heartbeat):
193 """This method is called in the ioloop thread when a message arrives.
193 """This method is called in the ioloop thread when a message arrives.
194
194
195 Subclasses should override this method to handle incoming messages.
195 Subclasses should override this method to handle incoming messages.
196 It is important to remember that this method is called in the thread
196 It is important to remember that this method is called in the thread
197 so that some logic must be done to ensure that the application level
197 so that some logic must be done to ensure that the application level
198 handlers are called in the application thread.
198 handlers are called in the application thread.
199 """
199 """
200 pass
200 pass
201
201
202
202
203 HBChannelABC.register(HBChannel)
203 HBChannelABC.register(HBChannel)
General Comments 0
You need to be logged in to leave comments. Login now