##// END OF EJS Templates
Collapse ZMQSocketChannel into HBChannel class
Thomas Kluyver -
Show More
@@ -62,124 +62,6 b' def validate_string_dict(dct):'
62 raise ValueError('value %r in dict must be a string' % v)
62 raise ValueError('value %r in dict must be a string' % v)
63
63
64
64
65 #-----------------------------------------------------------------------------
66 # ZMQ Socket Channel classes
67 #-----------------------------------------------------------------------------
68
69 class ZMQSocketChannel(Thread):
70 """The base class for the channels that use ZMQ sockets."""
71 context = None
72 session = None
73 socket = None
74 ioloop = None
75 stream = None
76 _address = None
77 _exiting = False
78 proxy_methods = []
79
80 def __init__(self, context, session, address):
81 """Create a channel.
82
83 Parameters
84 ----------
85 context : :class:`zmq.Context`
86 The ZMQ context to use.
87 session : :class:`session.Session`
88 The session to use.
89 address : zmq url
90 Standard (ip, port) tuple that the kernel is listening on.
91 """
92 super(ZMQSocketChannel, self).__init__()
93 self.daemon = True
94
95 self.context = context
96 self.session = session
97 if isinstance(address, tuple):
98 if address[1] == 0:
99 message = 'The port number for a channel cannot be 0.'
100 raise InvalidPortNumber(message)
101 address = "tcp://%s:%i" % address
102 self._address = address
103 atexit.register(self._notice_exit)
104
105 def _notice_exit(self):
106 self._exiting = True
107
108 def _run_loop(self):
109 """Run my loop, ignoring EINTR events in the poller"""
110 while True:
111 try:
112 self.ioloop.start()
113 except ZMQError as e:
114 if e.errno == errno.EINTR:
115 continue
116 else:
117 raise
118 except Exception:
119 if self._exiting:
120 break
121 else:
122 raise
123 else:
124 break
125
126 def stop(self):
127 """Stop the channel's event loop and join its thread.
128
129 This calls :meth:`~threading.Thread.join` and returns when the thread
130 terminates. :class:`RuntimeError` will be raised if
131 :meth:`~threading.Thread.start` is called again.
132 """
133 if self.ioloop is not None:
134 self.ioloop.stop()
135 self.join()
136 self.close()
137
138 def close(self):
139 if self.ioloop is not None:
140 try:
141 self.ioloop.close(all_fds=True)
142 except Exception:
143 pass
144 if self.socket is not None:
145 try:
146 self.socket.close(linger=0)
147 except Exception:
148 pass
149 self.socket = None
150
151 @property
152 def address(self):
153 """Get the channel's address as a zmq url string.
154
155 These URLS have the form: 'tcp://127.0.0.1:5555'.
156 """
157 return self._address
158
159 def _queue_send(self, msg):
160 """Queue a message to be sent from the IOLoop's thread.
161
162 Parameters
163 ----------
164 msg : message to send
165
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 thread control of the action.
168 """
169 def thread_send():
170 self.session.send(self.stream, msg)
171 self.ioloop.add_callback(thread_send)
172
173 def _handle_recv(self, msg):
174 """Callback for stream.on_recv.
175
176 Unpacks message, and calls handlers with it.
177 """
178 ident,smsg = self.session.feed_identities(msg)
179 msg = self.session.deserialize(smsg)
180 self.call_handlers(msg)
181
182
183 def make_shell_socket(context, identity, address):
65 def make_shell_socket(context, identity, address):
184 socket = context.socket(zmq.DEALER)
66 socket = context.socket(zmq.DEALER)
185 socket.linger = 1000
67 socket.linger = 1000
@@ -202,27 +84,57 b' def make_stdin_socket(context, identity, address):'
202 socket.connect(address)
84 socket.connect(address)
203 return socket
85 return socket
204
86
205 class HBChannel(ZMQSocketChannel):
87 class HBChannel(Thread):
206 """The heartbeat channel which monitors the kernel heartbeat.
88 """The heartbeat channel which monitors the kernel heartbeat.
207
89
208 Note that the heartbeat channel is paused by default. As long as you start
90 Note that the heartbeat channel is paused by default. As long as you start
209 this channel, the kernel manager will ensure that it is paused and un-paused
91 this channel, the kernel manager will ensure that it is paused and un-paused
210 as appropriate.
92 as appropriate.
211 """
93 """
94 context = None
95 session = None
96 socket = None
97 address = None
98 _exiting = False
212
99
213 time_to_dead = 1.
100 time_to_dead = 1.
214 socket = None
215 poller = None
101 poller = None
216 _running = None
102 _running = None
217 _pause = None
103 _pause = None
218 _beating = None
104 _beating = None
219
105
220 def __init__(self, context, session, address):
106 def __init__(self, context, session, address):
221 super(HBChannel, self).__init__(context, session, address)
107 """Create the heartbeat monitor thread.
108
109 Parameters
110 ----------
111 context : :class:`zmq.Context`
112 The ZMQ context to use.
113 session : :class:`session.Session`
114 The session to use.
115 address : zmq url
116 Standard (ip, port) tuple that the kernel is listening on.
117 """
118 super(HBChannel, self).__init__()
119 self.daemon = True
120
121 self.context = context
122 self.session = session
123 if isinstance(address, tuple):
124 if address[1] == 0:
125 message = 'The port number for a channel cannot be 0.'
126 raise InvalidPortNumber(message)
127 address = "tcp://%s:%i" % address
128 self.address = address
129 atexit.register(self._notice_exit)
130
222 self._running = False
131 self._running = False
223 self._pause =True
132 self._pause = True
224 self.poller = zmq.Poller()
133 self.poller = zmq.Poller()
225
134
135 def _notice_exit(self):
136 self._exiting = True
137
226 def _create_socket(self):
138 def _create_socket(self):
227 if self.socket is not None:
139 if self.socket is not None:
228 # close previous socket, before opening a new one
140 # close previous socket, before opening a new one
@@ -322,7 +234,16 b' class HBChannel(ZMQSocketChannel):'
322 def stop(self):
234 def stop(self):
323 """Stop the channel's event loop and join its thread."""
235 """Stop the channel's event loop and join its thread."""
324 self._running = False
236 self._running = False
325 super(HBChannel, self).stop()
237 self.join()
238 self.close()
239
240 def close(self):
241 if self.socket is not None:
242 try:
243 self.socket.close(linger=0)
244 except Exception:
245 pass
246 self.socket = None
326
247
327 def call_handlers(self, since_last_heartbeat):
248 def call_handlers(self, since_last_heartbeat):
328 """This method is called in the ioloop thread when a message arrives.
249 """This method is called in the ioloop thread when a message arrives.
General Comments 0
You need to be logged in to leave comments. Login now