##// END OF EJS Templates
Get rid of unused Qt signals for specific message types
Thomas Kluyver -
Show More
@@ -1,370 +1,318 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
21
21
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 from .util import SuperQObject
23 from .util import SuperQObject
24
24
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 pass
26 pass
27
27
28 from IPython.core.release import kernel_protocol_version_info
28 from IPython.core.release import kernel_protocol_version_info
29
29
30 from IPython.kernel.channelsabc import (
30 from IPython.kernel.channelsabc import (
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 )
32 )
33 from IPython.utils.py3compat import string_types, iteritems
33 from IPython.utils.py3compat import string_types, iteritems
34
34
35 major_protocol_version = kernel_protocol_version_info[0]
35 major_protocol_version = kernel_protocol_version_info[0]
36
36
37 class InvalidPortNumber(Exception):
37 class InvalidPortNumber(Exception):
38 pass
38 pass
39
39
40 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
41 # if they prove to have more generic utility
42
42
43
43
44 def validate_string_dict(dct):
44 def validate_string_dict(dct):
45 """Validate that the input is a dict with string keys and values.
45 """Validate that the input is a dict with string keys and values.
46
46
47 Raises ValueError if not."""
47 Raises ValueError if not."""
48 for k,v in iteritems(dct):
48 for k,v in iteritems(dct):
49 if not isinstance(k, string_types):
49 if not isinstance(k, string_types):
50 raise ValueError('key %r in dict must be a string' % k)
50 raise ValueError('key %r in dict must be a string' % k)
51 if not isinstance(v, string_types):
51 if not isinstance(v, string_types):
52 raise ValueError('value %r in dict must be a string' % v)
52 raise ValueError('value %r in dict must be a string' % v)
53
53
54
54
55
55
56 class QtZMQSocketChannel(SuperQObject, Thread):
56 class QtZMQSocketChannel(SuperQObject, Thread):
57 """The base class for the channels that use ZMQ sockets."""
57 """The base class for the channels that use ZMQ sockets."""
58 session = None
58 session = None
59 socket = None
59 socket = None
60 ioloop = None
60 ioloop = None
61 stream = None
61 stream = None
62 _exiting = False
62 _exiting = False
63 proxy_methods = []
63 proxy_methods = []
64
64
65 # Emitted when the channel is started.
65 # Emitted when the channel is started.
66 started = QtCore.Signal()
66 started = QtCore.Signal()
67
67
68 # Emitted when the channel is stopped.
68 # Emitted when the channel is stopped.
69 stopped = QtCore.Signal()
69 stopped = QtCore.Signal()
70
70
71 message_received = QtCore.Signal(object)
71 message_received = QtCore.Signal(object)
72
72
73 #---------------------------------------------------------------------------
73 #---------------------------------------------------------------------------
74 # InProcessChannel interface
74 # InProcessChannel interface
75 #---------------------------------------------------------------------------
75 #---------------------------------------------------------------------------
76
76
77 def call_handlers_later(self, *args, **kwds):
77 def call_handlers_later(self, *args, **kwds):
78 """ Call the message handlers later.
78 """ Call the message handlers later.
79 """
79 """
80 do_later = lambda: self.call_handlers(*args, **kwds)
80 do_later = lambda: self.call_handlers(*args, **kwds)
81 QtCore.QTimer.singleShot(0, do_later)
81 QtCore.QTimer.singleShot(0, do_later)
82
82
83 def process_events(self):
83 def process_events(self):
84 """ Process any pending GUI events.
84 """ Process any pending GUI events.
85 """
85 """
86 QtCore.QCoreApplication.instance().processEvents()
86 QtCore.QCoreApplication.instance().processEvents()
87
87
88 def __init__(self, socket, session):
88 def __init__(self, socket, session):
89 """Create a channel.
89 """Create a channel.
90
90
91 Parameters
91 Parameters
92 ----------
92 ----------
93 context : :class:`zmq.Context`
93 context : :class:`zmq.Context`
94 The ZMQ context to use.
94 The ZMQ context to use.
95 session : :class:`session.Session`
95 session : :class:`session.Session`
96 The session to use.
96 The session to use.
97 address : zmq url
97 address : zmq url
98 Standard (ip, port) tuple that the kernel is listening on.
98 Standard (ip, port) tuple that the kernel is listening on.
99 """
99 """
100 super(QtZMQSocketChannel, self).__init__()
100 super(QtZMQSocketChannel, self).__init__()
101 self.daemon = True
101 self.daemon = True
102
102
103 self.socket = socket
103 self.socket = socket
104 self.session = session
104 self.session = session
105 atexit.register(self._notice_exit)
105 atexit.register(self._notice_exit)
106
106
107 def _notice_exit(self):
107 def _notice_exit(self):
108 self._exiting = True
108 self._exiting = True
109
109
110 def _run_loop(self):
110 def _run_loop(self):
111 """Run my loop, ignoring EINTR events in the poller"""
111 """Run my loop, ignoring EINTR events in the poller"""
112 while True:
112 while True:
113 try:
113 try:
114 self.ioloop.start()
114 self.ioloop.start()
115 except ZMQError as e:
115 except ZMQError as e:
116 if e.errno == errno.EINTR:
116 if e.errno == errno.EINTR:
117 continue
117 continue
118 else:
118 else:
119 raise
119 raise
120 except Exception:
120 except Exception:
121 if self._exiting:
121 if self._exiting:
122 break
122 break
123 else:
123 else:
124 raise
124 raise
125 else:
125 else:
126 break
126 break
127
127
128 def start(self):
128 def start(self):
129 """ Reimplemented to emit signal.
129 """ Reimplemented to emit signal.
130 """
130 """
131 super(QtZMQSocketChannel, self).start()
131 super(QtZMQSocketChannel, self).start()
132 self.started.emit()
132 self.started.emit()
133
133
134 def stop(self):
134 def stop(self):
135 """Stop the channel's event loop and join its thread.
135 """Stop the channel's event loop and join its thread.
136
136
137 This calls :meth:`~threading.Thread.join` and returns when the thread
137 This calls :meth:`~threading.Thread.join` and returns when the thread
138 terminates. :class:`RuntimeError` will be raised if
138 terminates. :class:`RuntimeError` will be raised if
139 :meth:`~threading.Thread.start` is called again.
139 :meth:`~threading.Thread.start` is called again.
140 """
140 """
141 if self.ioloop is not None:
141 if self.ioloop is not None:
142 self.ioloop.stop()
142 self.ioloop.stop()
143 self.join()
143 self.join()
144 self.close()
144 self.close()
145 self.stopped.emit()
145 self.stopped.emit()
146
146
147 def close(self):
147 def close(self):
148 if self.ioloop is not None:
148 if self.ioloop is not None:
149 try:
149 try:
150 self.ioloop.close(all_fds=True)
150 self.ioloop.close(all_fds=True)
151 except Exception:
151 except Exception:
152 pass
152 pass
153 if self.socket is not None:
153 if self.socket is not None:
154 try:
154 try:
155 self.socket.close(linger=0)
155 self.socket.close(linger=0)
156 except Exception:
156 except Exception:
157 pass
157 pass
158 self.socket = None
158 self.socket = None
159
159
160 @property
160 @property
161 def address(self):
161 def address(self):
162 """Get the channel's address as a zmq url string.
162 """Get the channel's address as a zmq url string.
163
163
164 These URLS have the form: 'tcp://127.0.0.1:5555'.
164 These URLS have the form: 'tcp://127.0.0.1:5555'.
165 """
165 """
166 return self._address
166 return self._address
167
167
168 def _queue_send(self, msg):
168 def _queue_send(self, msg):
169 """Queue a message to be sent from the IOLoop's thread.
169 """Queue a message to be sent from the IOLoop's thread.
170
170
171 Parameters
171 Parameters
172 ----------
172 ----------
173 msg : message to send
173 msg : message to send
174
174
175 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
175 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
176 thread control of the action.
176 thread control of the action.
177 """
177 """
178 def thread_send():
178 def thread_send():
179 self.session.send(self.stream, msg)
179 self.session.send(self.stream, msg)
180 self.ioloop.add_callback(thread_send)
180 self.ioloop.add_callback(thread_send)
181
181
182 def _handle_recv(self, msg):
182 def _handle_recv(self, msg):
183 """Callback for stream.on_recv.
183 """Callback for stream.on_recv.
184
184
185 Unpacks message, and calls handlers with it.
185 Unpacks message, and calls handlers with it.
186 """
186 """
187 ident,smsg = self.session.feed_identities(msg)
187 ident,smsg = self.session.feed_identities(msg)
188 msg = self.session.deserialize(smsg)
188 msg = self.session.deserialize(smsg)
189 self.call_handlers(msg)
189 self.call_handlers(msg)
190
190
191 def call_handlers(self, msg):
191 def call_handlers(self, msg):
192 """This method is called in the ioloop thread when a message arrives.
192 """This method is called in the ioloop thread when a message arrives.
193
193
194 Subclasses should override this method to handle incoming messages.
194 Subclasses should override this method to handle incoming messages.
195 It is important to remember that this method is called in the thread
195 It is important to remember that this method is called in the thread
196 so that some logic must be done to ensure that the application level
196 so that some logic must be done to ensure that the application level
197 handlers are called in the application thread.
197 handlers are called in the application thread.
198 """
198 """
199 # Emit the generic signal.
199 # Emit the generic signal.
200 self.message_received.emit(msg)
200 self.message_received.emit(msg)
201
201
202
202
203 class QtShellChannel(QtZMQSocketChannel):
203 class QtShellChannel(QtZMQSocketChannel):
204 """The shell channel for issuing request/replies to the kernel."""
204 """The shell channel for issuing request/replies to the kernel."""
205
205
206 # Emitted when a reply has been received for the corresponding request type.
207 execute_reply = QtCore.Signal(object)
208 complete_reply = QtCore.Signal(object)
209 inspect_reply = QtCore.Signal(object)
210 history_reply = QtCore.Signal(object)
211 kernel_info_reply = QtCore.Signal(object)
212
213 def __init__(self, socket, session):
206 def __init__(self, socket, session):
214 super(QtShellChannel, self).__init__(socket, session)
207 super(QtShellChannel, self).__init__(socket, session)
215 self.ioloop = ioloop.IOLoop()
208 self.ioloop = ioloop.IOLoop()
216
209
217 def run(self):
210 def run(self):
218 """The thread's main activity. Call start() instead."""
211 """The thread's main activity. Call start() instead."""
219 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
212 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
220 self.stream.on_recv(self._handle_recv)
213 self.stream.on_recv(self._handle_recv)
221 self._run_loop()
214 self._run_loop()
222
215
223 def call_handlers(self, msg):
216 def call_handlers(self, msg):
224 super(QtShellChannel, self).call_handlers(msg)
217 super(QtShellChannel, self).call_handlers(msg)
225
218
226 # Catch kernel_info_reply for message spec adaptation
219 # Catch kernel_info_reply for message spec adaptation
227 msg_type = msg['header']['msg_type']
220 msg_type = msg['header']['msg_type']
228 if msg_type == 'kernel_info_reply':
221 if msg_type == 'kernel_info_reply':
229 self._handle_kernel_info_reply(msg)
222 self._handle_kernel_info_reply(msg)
230
223
231 # Emit specific signals
232 signal = getattr(self, msg_type, None)
233 if signal:
234 signal.emit(msg)
235
236 def _handle_kernel_info_reply(self, msg):
224 def _handle_kernel_info_reply(self, msg):
237 """handle kernel info reply
225 """handle kernel info reply
238
226
239 sets protocol adaptation version
227 sets protocol adaptation version
240 """
228 """
241 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
229 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
242 if adapt_version != major_protocol_version:
230 if adapt_version != major_protocol_version:
243 self.session.adapt_version = adapt_version
231 self.session.adapt_version = adapt_version
244
232
245
233
246 class QtIOPubChannel(QtZMQSocketChannel):
234 class QtIOPubChannel(QtZMQSocketChannel):
247 """The iopub channel which listens for messages that the kernel publishes.
235 """The iopub channel which listens for messages that the kernel publishes.
248
236
249 This channel is where all output is published to frontends.
237 This channel is where all output is published to frontends.
250 """
238 """
251 # Emitted when a message of type 'stream' is received.
252 stream_received = QtCore.Signal(object)
253
254 # Emitted when a message of type 'execute_input' is received.
255 execute_input_received = QtCore.Signal(object)
256
257 # Emitted when a message of type 'execute_result' is received.
258 execute_result_received = QtCore.Signal(object)
259
260 # Emitted when a message of type 'error' is received.
261 error_received = QtCore.Signal(object)
262
263 # Emitted when a message of type 'display_data' is received
264 display_data_received = QtCore.Signal(object)
265
266 # Emitted when a crash report message is received from the kernel's
267 # last-resort sys.excepthook.
268 crash_received = QtCore.Signal(object)
269
270 # Emitted when a shutdown is noticed.
271 shutdown_reply_received = QtCore.Signal(object)
272
239
273 def __init__(self, socket, session):
240 def __init__(self, socket, session):
274 super(QtIOPubChannel, self).__init__(socket, session)
241 super(QtIOPubChannel, self).__init__(socket, session)
275 self.ioloop = ioloop.IOLoop()
242 self.ioloop = ioloop.IOLoop()
276
243
277 def run(self):
244 def run(self):
278 """The thread's main activity. Call start() instead."""
245 """The thread's main activity. Call start() instead."""
279 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
246 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
280 self.stream.on_recv(self._handle_recv)
247 self.stream.on_recv(self._handle_recv)
281 self._run_loop()
248 self._run_loop()
282
249
283 def call_handlers(self, msg):
284 super(QtIOPubChannel, self).call_handlers(msg)
285
286 # Emit signals for specialized message types.
287 msg_type = msg['header']['msg_type']
288 signal = getattr(self, msg_type + '_received', None)
289 if signal:
290 signal.emit(msg)
291
292 def flush(self, timeout=1.0):
250 def flush(self, timeout=1.0):
293 """Immediately processes all pending messages on the iopub channel.
251 """Immediately processes all pending messages on the iopub channel.
294
252
295 Callers should use this method to ensure that :meth:`call_handlers`
253 Callers should use this method to ensure that :meth:`call_handlers`
296 has been called for all messages that have been received on the
254 has been called for all messages that have been received on the
297 0MQ SUB socket of this channel.
255 0MQ SUB socket of this channel.
298
256
299 This method is thread safe.
257 This method is thread safe.
300
258
301 Parameters
259 Parameters
302 ----------
260 ----------
303 timeout : float, optional
261 timeout : float, optional
304 The maximum amount of time to spend flushing, in seconds. The
262 The maximum amount of time to spend flushing, in seconds. The
305 default is one second.
263 default is one second.
306 """
264 """
307 # We do the IOLoop callback process twice to ensure that the IOLoop
265 # We do the IOLoop callback process twice to ensure that the IOLoop
308 # gets to perform at least one full poll.
266 # gets to perform at least one full poll.
309 stop_time = time.time() + timeout
267 stop_time = time.time() + timeout
310 for i in range(2):
268 for i in range(2):
311 self._flushed = False
269 self._flushed = False
312 self.ioloop.add_callback(self._flush)
270 self.ioloop.add_callback(self._flush)
313 while not self._flushed and time.time() < stop_time:
271 while not self._flushed and time.time() < stop_time:
314 time.sleep(0.01)
272 time.sleep(0.01)
315
273
316 def _flush(self):
274 def _flush(self):
317 """Callback for :method:`self.flush`."""
275 """Callback for :method:`self.flush`."""
318 self.stream.flush()
276 self.stream.flush()
319 self._flushed = True
277 self._flushed = True
320
278
321
279
322 class QtStdInChannel(QtZMQSocketChannel):
280 class QtStdInChannel(QtZMQSocketChannel):
323 """The stdin channel to handle raw_input requests that the kernel makes."""
281 """The stdin channel to handle raw_input requests that the kernel makes."""
324
282
325 msg_queue = None
283 msg_queue = None
326 proxy_methods = ['input']
284 proxy_methods = ['input']
327
285
328 # Emitted when an input request is received.
329 input_requested = QtCore.Signal(object)
330
331 def __init__(self, socket, session):
286 def __init__(self, socket, session):
332 super(QtStdInChannel, self).__init__(socket, session)
287 super(QtStdInChannel, self).__init__(socket, session)
333 self.ioloop = ioloop.IOLoop()
288 self.ioloop = ioloop.IOLoop()
334
289
335 def run(self):
290 def run(self):
336 """The thread's main activity. Call start() instead."""
291 """The thread's main activity. Call start() instead."""
337 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
292 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
338 self.stream.on_recv(self._handle_recv)
293 self.stream.on_recv(self._handle_recv)
339 self._run_loop()
294 self._run_loop()
340
295
341 def call_handlers(self, msg):
342 super(QtStdInChannel, self).call_handlers(msg)
343
344 # Emit signals for specialized message types.
345 msg_type = msg['header']['msg_type']
346 if msg_type == 'input_request':
347 self.input_requested.emit(msg)
348
349
296
350 ShellChannelABC.register(QtShellChannel)
297 ShellChannelABC.register(QtShellChannel)
351 IOPubChannelABC.register(QtIOPubChannel)
298 IOPubChannelABC.register(QtIOPubChannel)
352 StdInChannelABC.register(QtStdInChannel)
299 StdInChannelABC.register(QtStdInChannel)
353
300
354
301
355 class QtKernelClient(QtKernelClientMixin, KernelClient):
302 class QtKernelClient(QtKernelClientMixin, KernelClient):
356 """ A KernelClient that provides signals and slots.
303 """ A KernelClient that provides signals and slots.
357 """
304 """
358 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
305 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
359 if shell:
306 if shell:
360 self.shell_channel.kernel_info_reply.connect(self._handle_kernel_info_reply)
307 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
361 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
308 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
362
309
363 def _handle_kernel_info_reply(self, msg):
310 def _check_kernel_info_reply(self, msg):
364 super(QtKernelClient, self)._handle_kernel_info_reply(msg)
311 if msg['msg_type'] == 'kernel_info_reply':
365 self.shell_channel.kernel_info_reply.disconnect(self._handle_kernel_info_reply)
312 self._handle_kernel_info_reply(msg)
313 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
366
314
367 iopub_channel_class = Type(QtIOPubChannel)
315 iopub_channel_class = Type(QtIOPubChannel)
368 shell_channel_class = Type(QtShellChannel)
316 shell_channel_class = Type(QtShellChannel)
369 stdin_channel_class = Type(QtStdInChannel)
317 stdin_channel_class = Type(QtStdInChannel)
370 hb_channel_class = Type(QtHBChannel)
318 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now