##// END OF EJS Templates
Get rid of unused Qt signals for specific message types
Thomas Kluyver -
Show More
@@ -1,370 +1,318 b''
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
21 21
22 22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 23 from .util import SuperQObject
24 24
25 25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 26 pass
27 27
28 28 from IPython.core.release import kernel_protocol_version_info
29 29
30 30 from IPython.kernel.channelsabc import (
31 31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 32 )
33 33 from IPython.utils.py3compat import string_types, iteritems
34 34
35 35 major_protocol_version = kernel_protocol_version_info[0]
36 36
37 37 class InvalidPortNumber(Exception):
38 38 pass
39 39
40 40 # some utilities to validate message structure, these might get moved elsewhere
41 41 # if they prove to have more generic utility
42 42
43 43
44 44 def validate_string_dict(dct):
45 45 """Validate that the input is a dict with string keys and values.
46 46
47 47 Raises ValueError if not."""
48 48 for k,v in iteritems(dct):
49 49 if not isinstance(k, string_types):
50 50 raise ValueError('key %r in dict must be a string' % k)
51 51 if not isinstance(v, string_types):
52 52 raise ValueError('value %r in dict must be a string' % v)
53 53
54 54
55 55
56 56 class QtZMQSocketChannel(SuperQObject, Thread):
57 57 """The base class for the channels that use ZMQ sockets."""
58 58 session = None
59 59 socket = None
60 60 ioloop = None
61 61 stream = None
62 62 _exiting = False
63 63 proxy_methods = []
64 64
65 65 # Emitted when the channel is started.
66 66 started = QtCore.Signal()
67 67
68 68 # Emitted when the channel is stopped.
69 69 stopped = QtCore.Signal()
70 70
71 71 message_received = QtCore.Signal(object)
72 72
73 73 #---------------------------------------------------------------------------
74 74 # InProcessChannel interface
75 75 #---------------------------------------------------------------------------
76 76
77 77 def call_handlers_later(self, *args, **kwds):
78 78 """ Call the message handlers later.
79 79 """
80 80 do_later = lambda: self.call_handlers(*args, **kwds)
81 81 QtCore.QTimer.singleShot(0, do_later)
82 82
83 83 def process_events(self):
84 84 """ Process any pending GUI events.
85 85 """
86 86 QtCore.QCoreApplication.instance().processEvents()
87 87
88 88 def __init__(self, socket, session):
89 89 """Create a channel.
90 90
91 91 Parameters
92 92 ----------
93 93 context : :class:`zmq.Context`
94 94 The ZMQ context to use.
95 95 session : :class:`session.Session`
96 96 The session to use.
97 97 address : zmq url
98 98 Standard (ip, port) tuple that the kernel is listening on.
99 99 """
100 100 super(QtZMQSocketChannel, self).__init__()
101 101 self.daemon = True
102 102
103 103 self.socket = socket
104 104 self.session = session
105 105 atexit.register(self._notice_exit)
106 106
107 107 def _notice_exit(self):
108 108 self._exiting = True
109 109
110 110 def _run_loop(self):
111 111 """Run my loop, ignoring EINTR events in the poller"""
112 112 while True:
113 113 try:
114 114 self.ioloop.start()
115 115 except ZMQError as e:
116 116 if e.errno == errno.EINTR:
117 117 continue
118 118 else:
119 119 raise
120 120 except Exception:
121 121 if self._exiting:
122 122 break
123 123 else:
124 124 raise
125 125 else:
126 126 break
127 127
128 128 def start(self):
129 129 """ Reimplemented to emit signal.
130 130 """
131 131 super(QtZMQSocketChannel, self).start()
132 132 self.started.emit()
133 133
134 134 def stop(self):
135 135 """Stop the channel's event loop and join its thread.
136 136
137 137 This calls :meth:`~threading.Thread.join` and returns when the thread
138 138 terminates. :class:`RuntimeError` will be raised if
139 139 :meth:`~threading.Thread.start` is called again.
140 140 """
141 141 if self.ioloop is not None:
142 142 self.ioloop.stop()
143 143 self.join()
144 144 self.close()
145 145 self.stopped.emit()
146 146
147 147 def close(self):
148 148 if self.ioloop is not None:
149 149 try:
150 150 self.ioloop.close(all_fds=True)
151 151 except Exception:
152 152 pass
153 153 if self.socket is not None:
154 154 try:
155 155 self.socket.close(linger=0)
156 156 except Exception:
157 157 pass
158 158 self.socket = None
159 159
160 160 @property
161 161 def address(self):
162 162 """Get the channel's address as a zmq url string.
163 163
164 164 These URLS have the form: 'tcp://127.0.0.1:5555'.
165 165 """
166 166 return self._address
167 167
168 168 def _queue_send(self, msg):
169 169 """Queue a message to be sent from the IOLoop's thread.
170 170
171 171 Parameters
172 172 ----------
173 173 msg : message to send
174 174
175 175 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
176 176 thread control of the action.
177 177 """
178 178 def thread_send():
179 179 self.session.send(self.stream, msg)
180 180 self.ioloop.add_callback(thread_send)
181 181
182 182 def _handle_recv(self, msg):
183 183 """Callback for stream.on_recv.
184 184
185 185 Unpacks message, and calls handlers with it.
186 186 """
187 187 ident,smsg = self.session.feed_identities(msg)
188 188 msg = self.session.deserialize(smsg)
189 189 self.call_handlers(msg)
190 190
191 191 def call_handlers(self, msg):
192 192 """This method is called in the ioloop thread when a message arrives.
193 193
194 194 Subclasses should override this method to handle incoming messages.
195 195 It is important to remember that this method is called in the thread
196 196 so that some logic must be done to ensure that the application level
197 197 handlers are called in the application thread.
198 198 """
199 199 # Emit the generic signal.
200 200 self.message_received.emit(msg)
201 201
202 202
203 203 class QtShellChannel(QtZMQSocketChannel):
204 204 """The shell channel for issuing request/replies to the kernel."""
205 205
206 # Emitted when a reply has been received for the corresponding request type.
207 execute_reply = QtCore.Signal(object)
208 complete_reply = QtCore.Signal(object)
209 inspect_reply = QtCore.Signal(object)
210 history_reply = QtCore.Signal(object)
211 kernel_info_reply = QtCore.Signal(object)
212
213 206 def __init__(self, socket, session):
214 207 super(QtShellChannel, self).__init__(socket, session)
215 208 self.ioloop = ioloop.IOLoop()
216 209
217 210 def run(self):
218 211 """The thread's main activity. Call start() instead."""
219 212 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
220 213 self.stream.on_recv(self._handle_recv)
221 214 self._run_loop()
222 215
223 216 def call_handlers(self, msg):
224 217 super(QtShellChannel, self).call_handlers(msg)
225 218
226 219 # Catch kernel_info_reply for message spec adaptation
227 220 msg_type = msg['header']['msg_type']
228 221 if msg_type == 'kernel_info_reply':
229 222 self._handle_kernel_info_reply(msg)
230 223
231 # Emit specific signals
232 signal = getattr(self, msg_type, None)
233 if signal:
234 signal.emit(msg)
235
236 224 def _handle_kernel_info_reply(self, msg):
237 225 """handle kernel info reply
238 226
239 227 sets protocol adaptation version
240 228 """
241 229 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
242 230 if adapt_version != major_protocol_version:
243 231 self.session.adapt_version = adapt_version
244 232
245 233
246 234 class QtIOPubChannel(QtZMQSocketChannel):
247 235 """The iopub channel which listens for messages that the kernel publishes.
248 236
249 237 This channel is where all output is published to frontends.
250 238 """
251 # Emitted when a message of type 'stream' is received.
252 stream_received = QtCore.Signal(object)
253
254 # Emitted when a message of type 'execute_input' is received.
255 execute_input_received = QtCore.Signal(object)
256
257 # Emitted when a message of type 'execute_result' is received.
258 execute_result_received = QtCore.Signal(object)
259
260 # Emitted when a message of type 'error' is received.
261 error_received = QtCore.Signal(object)
262
263 # Emitted when a message of type 'display_data' is received
264 display_data_received = QtCore.Signal(object)
265
266 # Emitted when a crash report message is received from the kernel's
267 # last-resort sys.excepthook.
268 crash_received = QtCore.Signal(object)
269
270 # Emitted when a shutdown is noticed.
271 shutdown_reply_received = QtCore.Signal(object)
272 239
273 240 def __init__(self, socket, session):
274 241 super(QtIOPubChannel, self).__init__(socket, session)
275 242 self.ioloop = ioloop.IOLoop()
276 243
277 244 def run(self):
278 245 """The thread's main activity. Call start() instead."""
279 246 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
280 247 self.stream.on_recv(self._handle_recv)
281 248 self._run_loop()
282 249
283 def call_handlers(self, msg):
284 super(QtIOPubChannel, self).call_handlers(msg)
285
286 # Emit signals for specialized message types.
287 msg_type = msg['header']['msg_type']
288 signal = getattr(self, msg_type + '_received', None)
289 if signal:
290 signal.emit(msg)
291
292 250 def flush(self, timeout=1.0):
293 251 """Immediately processes all pending messages on the iopub channel.
294 252
295 253 Callers should use this method to ensure that :meth:`call_handlers`
296 254 has been called for all messages that have been received on the
297 255 0MQ SUB socket of this channel.
298 256
299 257 This method is thread safe.
300 258
301 259 Parameters
302 260 ----------
303 261 timeout : float, optional
304 262 The maximum amount of time to spend flushing, in seconds. The
305 263 default is one second.
306 264 """
307 265 # We do the IOLoop callback process twice to ensure that the IOLoop
308 266 # gets to perform at least one full poll.
309 267 stop_time = time.time() + timeout
310 268 for i in range(2):
311 269 self._flushed = False
312 270 self.ioloop.add_callback(self._flush)
313 271 while not self._flushed and time.time() < stop_time:
314 272 time.sleep(0.01)
315 273
316 274 def _flush(self):
317 275 """Callback for :method:`self.flush`."""
318 276 self.stream.flush()
319 277 self._flushed = True
320 278
321 279
322 280 class QtStdInChannel(QtZMQSocketChannel):
323 281 """The stdin channel to handle raw_input requests that the kernel makes."""
324 282
325 283 msg_queue = None
326 284 proxy_methods = ['input']
327 285
328 # Emitted when an input request is received.
329 input_requested = QtCore.Signal(object)
330
331 286 def __init__(self, socket, session):
332 287 super(QtStdInChannel, self).__init__(socket, session)
333 288 self.ioloop = ioloop.IOLoop()
334 289
335 290 def run(self):
336 291 """The thread's main activity. Call start() instead."""
337 292 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
338 293 self.stream.on_recv(self._handle_recv)
339 294 self._run_loop()
340 295
341 def call_handlers(self, msg):
342 super(QtStdInChannel, self).call_handlers(msg)
343
344 # Emit signals for specialized message types.
345 msg_type = msg['header']['msg_type']
346 if msg_type == 'input_request':
347 self.input_requested.emit(msg)
348
349 296
350 297 ShellChannelABC.register(QtShellChannel)
351 298 IOPubChannelABC.register(QtIOPubChannel)
352 299 StdInChannelABC.register(QtStdInChannel)
353 300
354 301
355 302 class QtKernelClient(QtKernelClientMixin, KernelClient):
356 303 """ A KernelClient that provides signals and slots.
357 304 """
358 305 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
359 306 if shell:
360 self.shell_channel.kernel_info_reply.connect(self._handle_kernel_info_reply)
307 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
361 308 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
362 309
363 def _handle_kernel_info_reply(self, msg):
364 super(QtKernelClient, self)._handle_kernel_info_reply(msg)
365 self.shell_channel.kernel_info_reply.disconnect(self._handle_kernel_info_reply)
310 def _check_kernel_info_reply(self, msg):
311 if msg['msg_type'] == 'kernel_info_reply':
312 self._handle_kernel_info_reply(msg)
313 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
366 314
367 315 iopub_channel_class = Type(QtIOPubChannel)
368 316 shell_channel_class = Type(QtShellChannel)
369 317 stdin_channel_class = Type(QtStdInChannel)
370 318 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now