##// END OF EJS Templates
Get rid of QtIOPubChannel class
Thomas Kluyver -
Show More
@@ -1,264 +1,257
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
21
21
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 from .util import SuperQObject
23 from .util import SuperQObject
24
24
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 pass
26 pass
27
27
28 from IPython.core.release import kernel_protocol_version_info
28 from IPython.core.release import kernel_protocol_version_info
29
29
30 from IPython.kernel.channelsabc import (
30 from IPython.kernel.channelsabc import (
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 )
32 )
33 from IPython.utils.py3compat import string_types, iteritems
33 from IPython.utils.py3compat import string_types, iteritems
34
34
35 major_protocol_version = kernel_protocol_version_info[0]
35 major_protocol_version = kernel_protocol_version_info[0]
36
36
37 class InvalidPortNumber(Exception):
37 class InvalidPortNumber(Exception):
38 pass
38 pass
39
39
40 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
41 # if they prove to have more generic utility
42
42
43
43
44 def validate_string_dict(dct):
44 def validate_string_dict(dct):
45 """Validate that the input is a dict with string keys and values.
45 """Validate that the input is a dict with string keys and values.
46
46
47 Raises ValueError if not."""
47 Raises ValueError if not."""
48 for k,v in iteritems(dct):
48 for k,v in iteritems(dct):
49 if not isinstance(k, string_types):
49 if not isinstance(k, string_types):
50 raise ValueError('key %r in dict must be a string' % k)
50 raise ValueError('key %r in dict must be a string' % k)
51 if not isinstance(v, string_types):
51 if not isinstance(v, string_types):
52 raise ValueError('value %r in dict must be a string' % v)
52 raise ValueError('value %r in dict must be a string' % v)
53
53
54
54
55
55
56 class QtZMQSocketChannel(SuperQObject, Thread):
56 class QtZMQSocketChannel(SuperQObject, Thread):
57 """The base class for the channels that use ZMQ sockets."""
57 """The base class for the channels that use ZMQ sockets."""
58 session = None
58 session = None
59 socket = None
59 socket = None
60 ioloop = None
60 ioloop = None
61 stream = None
61 stream = None
62 _exiting = False
62 _exiting = False
63 proxy_methods = []
63 proxy_methods = []
64
64
65 # Emitted when the channel is started.
65 # Emitted when the channel is started.
66 started = QtCore.Signal()
66 started = QtCore.Signal()
67
67
68 # Emitted when the channel is stopped.
68 # Emitted when the channel is stopped.
69 stopped = QtCore.Signal()
69 stopped = QtCore.Signal()
70
70
71 message_received = QtCore.Signal(object)
71 message_received = QtCore.Signal(object)
72
72
73 #---------------------------------------------------------------------------
73 #---------------------------------------------------------------------------
74 # InProcessChannel interface
74 # InProcessChannel interface
75 #---------------------------------------------------------------------------
75 #---------------------------------------------------------------------------
76
76
77 def call_handlers_later(self, *args, **kwds):
77 def call_handlers_later(self, *args, **kwds):
78 """ Call the message handlers later.
78 """ Call the message handlers later.
79 """
79 """
80 do_later = lambda: self.call_handlers(*args, **kwds)
80 do_later = lambda: self.call_handlers(*args, **kwds)
81 QtCore.QTimer.singleShot(0, do_later)
81 QtCore.QTimer.singleShot(0, do_later)
82
82
83 def process_events(self):
83 def process_events(self):
84 """ Process any pending GUI events.
84 """ Process any pending GUI events.
85 """
85 """
86 QtCore.QCoreApplication.instance().processEvents()
86 QtCore.QCoreApplication.instance().processEvents()
87
87
88 def __init__(self, socket, session):
88 def __init__(self, socket, session):
89 """Create a channel.
89 """Create a channel.
90
90
91 Parameters
91 Parameters
92 ----------
92 ----------
93 context : :class:`zmq.Context`
93 context : :class:`zmq.Context`
94 The ZMQ context to use.
94 The ZMQ context to use.
95 session : :class:`session.Session`
95 session : :class:`session.Session`
96 The session to use.
96 The session to use.
97 address : zmq url
97 address : zmq url
98 Standard (ip, port) tuple that the kernel is listening on.
98 Standard (ip, port) tuple that the kernel is listening on.
99 """
99 """
100 super(QtZMQSocketChannel, self).__init__()
100 super(QtZMQSocketChannel, self).__init__()
101 self.daemon = True
101 self.daemon = True
102
102
103 self.socket = socket
103 self.socket = socket
104 self.session = session
104 self.session = session
105 atexit.register(self._notice_exit)
105 atexit.register(self._notice_exit)
106 self.ioloop = ioloop.IOLoop()
106 self.ioloop = ioloop.IOLoop()
107
107
108 def _notice_exit(self):
108 def _notice_exit(self):
109 self._exiting = True
109 self._exiting = True
110
110
111 def _run_loop(self):
111 def _run_loop(self):
112 """Run my loop, ignoring EINTR events in the poller"""
112 """Run my loop, ignoring EINTR events in the poller"""
113 while True:
113 while True:
114 try:
114 try:
115 self.ioloop.start()
115 self.ioloop.start()
116 except ZMQError as e:
116 except ZMQError as e:
117 if e.errno == errno.EINTR:
117 if e.errno == errno.EINTR:
118 continue
118 continue
119 else:
119 else:
120 raise
120 raise
121 except Exception:
121 except Exception:
122 if self._exiting:
122 if self._exiting:
123 break
123 break
124 else:
124 else:
125 raise
125 raise
126 else:
126 else:
127 break
127 break
128
128
129 def start(self):
129 def start(self):
130 """ Reimplemented to emit signal.
130 """ Reimplemented to emit signal.
131 """
131 """
132 super(QtZMQSocketChannel, self).start()
132 super(QtZMQSocketChannel, self).start()
133 self.started.emit()
133 self.started.emit()
134
134
135 def run(self):
135 def run(self):
136 """The thread's main activity. Call start() instead."""
136 """The thread's main activity. Call start() instead."""
137 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
137 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
138 self.stream.on_recv(self._handle_recv)
138 self.stream.on_recv(self._handle_recv)
139 self._run_loop()
139 self._run_loop()
140
140
141 def stop(self):
141 def stop(self):
142 """Stop the channel's event loop and join its thread.
142 """Stop the channel's event loop and join its thread.
143
143
144 This calls :meth:`~threading.Thread.join` and returns when the thread
144 This calls :meth:`~threading.Thread.join` and returns when the thread
145 terminates. :class:`RuntimeError` will be raised if
145 terminates. :class:`RuntimeError` will be raised if
146 :meth:`~threading.Thread.start` is called again.
146 :meth:`~threading.Thread.start` is called again.
147 """
147 """
148 if self.ioloop is not None:
148 if self.ioloop is not None:
149 self.ioloop.stop()
149 self.ioloop.stop()
150 self.join()
150 self.join()
151 self.close()
151 self.close()
152 self.stopped.emit()
152 self.stopped.emit()
153
153
154 def close(self):
154 def close(self):
155 if self.ioloop is not None:
155 if self.ioloop is not None:
156 try:
156 try:
157 self.ioloop.close(all_fds=True)
157 self.ioloop.close(all_fds=True)
158 except Exception:
158 except Exception:
159 pass
159 pass
160 if self.socket is not None:
160 if self.socket is not None:
161 try:
161 try:
162 self.socket.close(linger=0)
162 self.socket.close(linger=0)
163 except Exception:
163 except Exception:
164 pass
164 pass
165 self.socket = None
165 self.socket = None
166
166
167 @property
167 @property
168 def address(self):
168 def address(self):
169 """Get the channel's address as a zmq url string.
169 """Get the channel's address as a zmq url string.
170
170
171 These URLS have the form: 'tcp://127.0.0.1:5555'.
171 These URLS have the form: 'tcp://127.0.0.1:5555'.
172 """
172 """
173 return self._address
173 return self._address
174
174
175 def _queue_send(self, msg):
175 def _queue_send(self, msg):
176 """Queue a message to be sent from the IOLoop's thread.
176 """Queue a message to be sent from the IOLoop's thread.
177
177
178 Parameters
178 Parameters
179 ----------
179 ----------
180 msg : message to send
180 msg : message to send
181
181
182 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
182 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
183 thread control of the action.
183 thread control of the action.
184 """
184 """
185 def thread_send():
185 def thread_send():
186 self.session.send(self.stream, msg)
186 self.session.send(self.stream, msg)
187 self.ioloop.add_callback(thread_send)
187 self.ioloop.add_callback(thread_send)
188
188
189 def _handle_recv(self, msg):
189 def _handle_recv(self, msg):
190 """Callback for stream.on_recv.
190 """Callback for stream.on_recv.
191
191
192 Unpacks message, and calls handlers with it.
192 Unpacks message, and calls handlers with it.
193 """
193 """
194 ident,smsg = self.session.feed_identities(msg)
194 ident,smsg = self.session.feed_identities(msg)
195 msg = self.session.deserialize(smsg)
195 msg = self.session.deserialize(smsg)
196 self.call_handlers(msg)
196 self.call_handlers(msg)
197
197
198 def call_handlers(self, msg):
198 def call_handlers(self, msg):
199 """This method is called in the ioloop thread when a message arrives.
199 """This method is called in the ioloop thread when a message arrives.
200
200
201 Subclasses should override this method to handle incoming messages.
201 Subclasses should override this method to handle incoming messages.
202 It is important to remember that this method is called in the thread
202 It is important to remember that this method is called in the thread
203 so that some logic must be done to ensure that the application level
203 so that some logic must be done to ensure that the application level
204 handlers are called in the application thread.
204 handlers are called in the application thread.
205 """
205 """
206 # Emit the generic signal.
206 # Emit the generic signal.
207 self.message_received.emit(msg)
207 self.message_received.emit(msg)
208
208
209
210 class QtIOPubChannel(QtZMQSocketChannel):
211 """The iopub channel which listens for messages that the kernel publishes.
212
213 This channel is where all output is published to frontends.
214 """
215
216 def flush(self, timeout=1.0):
209 def flush(self, timeout=1.0):
217 """Immediately processes all pending messages on the iopub channel.
210 """Immediately processes all pending messages on this channel.
211
212 This is only used for the IOPub channel.
218
213
219 Callers should use this method to ensure that :meth:`call_handlers`
214 Callers should use this method to ensure that :meth:`call_handlers`
220 has been called for all messages that have been received on the
215 has been called for all messages that have been received on the
221 0MQ SUB socket of this channel.
216 0MQ SUB socket of this channel.
222
217
223 This method is thread safe.
218 This method is thread safe.
224
219
225 Parameters
220 Parameters
226 ----------
221 ----------
227 timeout : float, optional
222 timeout : float, optional
228 The maximum amount of time to spend flushing, in seconds. The
223 The maximum amount of time to spend flushing, in seconds. The
229 default is one second.
224 default is one second.
230 """
225 """
231 # We do the IOLoop callback process twice to ensure that the IOLoop
226 # We do the IOLoop callback process twice to ensure that the IOLoop
232 # gets to perform at least one full poll.
227 # gets to perform at least one full poll.
233 stop_time = time.time() + timeout
228 stop_time = time.time() + timeout
234 for i in range(2):
229 for i in range(2):
235 self._flushed = False
230 self._flushed = False
236 self.ioloop.add_callback(self._flush)
231 self.ioloop.add_callback(self._flush)
237 while not self._flushed and time.time() < stop_time:
232 while not self._flushed and time.time() < stop_time:
238 time.sleep(0.01)
233 time.sleep(0.01)
239
234
240 def _flush(self):
235 def _flush(self):
241 """Callback for :method:`self.flush`."""
236 """Callback for :method:`self.flush`."""
242 self.stream.flush()
237 self.stream.flush()
243 self._flushed = True
238 self._flushed = True
244
239
245 IOPubChannelABC.register(QtIOPubChannel)
246
247
240
248 class QtKernelClient(QtKernelClientMixin, KernelClient):
241 class QtKernelClient(QtKernelClientMixin, KernelClient):
249 """ A KernelClient that provides signals and slots.
242 """ A KernelClient that provides signals and slots.
250 """
243 """
251 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
244 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
252 if shell:
245 if shell:
253 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
246 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
254 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
247 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
255
248
256 def _check_kernel_info_reply(self, msg):
249 def _check_kernel_info_reply(self, msg):
257 if msg['msg_type'] == 'kernel_info_reply':
250 if msg['msg_type'] == 'kernel_info_reply':
258 self._handle_kernel_info_reply(msg)
251 self._handle_kernel_info_reply(msg)
259 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
252 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
260
253
261 iopub_channel_class = Type(QtIOPubChannel)
254 iopub_channel_class = Type(QtZMQSocketChannel)
262 shell_channel_class = Type(QtZMQSocketChannel)
255 shell_channel_class = Type(QtZMQSocketChannel)
263 stdin_channel_class = Type(QtZMQSocketChannel)
256 stdin_channel_class = Type(QtZMQSocketChannel)
264 hb_channel_class = Type(QtHBChannel)
257 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now