##// END OF EJS Templates
Get rid of QtIOPubChannel class
Thomas Kluyver -
Show More
@@ -1,264 +1,257
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
21 21
22 22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 23 from .util import SuperQObject
24 24
25 25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 26 pass
27 27
28 28 from IPython.core.release import kernel_protocol_version_info
29 29
30 30 from IPython.kernel.channelsabc import (
31 31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 32 )
33 33 from IPython.utils.py3compat import string_types, iteritems
34 34
35 35 major_protocol_version = kernel_protocol_version_info[0]
36 36
37 37 class InvalidPortNumber(Exception):
38 38 pass
39 39
40 40 # some utilities to validate message structure, these might get moved elsewhere
41 41 # if they prove to have more generic utility
42 42
43 43
44 44 def validate_string_dict(dct):
45 45 """Validate that the input is a dict with string keys and values.
46 46
47 47 Raises ValueError if not."""
48 48 for k,v in iteritems(dct):
49 49 if not isinstance(k, string_types):
50 50 raise ValueError('key %r in dict must be a string' % k)
51 51 if not isinstance(v, string_types):
52 52 raise ValueError('value %r in dict must be a string' % v)
53 53
54 54
55 55
56 56 class QtZMQSocketChannel(SuperQObject, Thread):
57 57 """The base class for the channels that use ZMQ sockets."""
58 58 session = None
59 59 socket = None
60 60 ioloop = None
61 61 stream = None
62 62 _exiting = False
63 63 proxy_methods = []
64 64
65 65 # Emitted when the channel is started.
66 66 started = QtCore.Signal()
67 67
68 68 # Emitted when the channel is stopped.
69 69 stopped = QtCore.Signal()
70 70
71 71 message_received = QtCore.Signal(object)
72 72
73 73 #---------------------------------------------------------------------------
74 74 # InProcessChannel interface
75 75 #---------------------------------------------------------------------------
76 76
77 77 def call_handlers_later(self, *args, **kwds):
78 78 """ Call the message handlers later.
79 79 """
80 80 do_later = lambda: self.call_handlers(*args, **kwds)
81 81 QtCore.QTimer.singleShot(0, do_later)
82 82
83 83 def process_events(self):
84 84 """ Process any pending GUI events.
85 85 """
86 86 QtCore.QCoreApplication.instance().processEvents()
87 87
88 88 def __init__(self, socket, session):
89 89 """Create a channel.
90 90
91 91 Parameters
92 92 ----------
93 93 context : :class:`zmq.Context`
94 94 The ZMQ context to use.
95 95 session : :class:`session.Session`
96 96 The session to use.
97 97 address : zmq url
98 98 Standard (ip, port) tuple that the kernel is listening on.
99 99 """
100 100 super(QtZMQSocketChannel, self).__init__()
101 101 self.daemon = True
102 102
103 103 self.socket = socket
104 104 self.session = session
105 105 atexit.register(self._notice_exit)
106 106 self.ioloop = ioloop.IOLoop()
107 107
108 108 def _notice_exit(self):
109 109 self._exiting = True
110 110
111 111 def _run_loop(self):
112 112 """Run my loop, ignoring EINTR events in the poller"""
113 113 while True:
114 114 try:
115 115 self.ioloop.start()
116 116 except ZMQError as e:
117 117 if e.errno == errno.EINTR:
118 118 continue
119 119 else:
120 120 raise
121 121 except Exception:
122 122 if self._exiting:
123 123 break
124 124 else:
125 125 raise
126 126 else:
127 127 break
128 128
129 129 def start(self):
130 130 """ Reimplemented to emit signal.
131 131 """
132 132 super(QtZMQSocketChannel, self).start()
133 133 self.started.emit()
134 134
135 135 def run(self):
136 136 """The thread's main activity. Call start() instead."""
137 137 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
138 138 self.stream.on_recv(self._handle_recv)
139 139 self._run_loop()
140 140
141 141 def stop(self):
142 142 """Stop the channel's event loop and join its thread.
143 143
144 144 This calls :meth:`~threading.Thread.join` and returns when the thread
145 145 terminates. :class:`RuntimeError` will be raised if
146 146 :meth:`~threading.Thread.start` is called again.
147 147 """
148 148 if self.ioloop is not None:
149 149 self.ioloop.stop()
150 150 self.join()
151 151 self.close()
152 152 self.stopped.emit()
153 153
154 154 def close(self):
155 155 if self.ioloop is not None:
156 156 try:
157 157 self.ioloop.close(all_fds=True)
158 158 except Exception:
159 159 pass
160 160 if self.socket is not None:
161 161 try:
162 162 self.socket.close(linger=0)
163 163 except Exception:
164 164 pass
165 165 self.socket = None
166 166
167 167 @property
168 168 def address(self):
169 169 """Get the channel's address as a zmq url string.
170 170
171 171 These URLS have the form: 'tcp://127.0.0.1:5555'.
172 172 """
173 173 return self._address
174 174
175 175 def _queue_send(self, msg):
176 176 """Queue a message to be sent from the IOLoop's thread.
177 177
178 178 Parameters
179 179 ----------
180 180 msg : message to send
181 181
182 182 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
183 183 thread control of the action.
184 184 """
185 185 def thread_send():
186 186 self.session.send(self.stream, msg)
187 187 self.ioloop.add_callback(thread_send)
188 188
189 189 def _handle_recv(self, msg):
190 190 """Callback for stream.on_recv.
191 191
192 192 Unpacks message, and calls handlers with it.
193 193 """
194 194 ident,smsg = self.session.feed_identities(msg)
195 195 msg = self.session.deserialize(smsg)
196 196 self.call_handlers(msg)
197 197
198 198 def call_handlers(self, msg):
199 199 """This method is called in the ioloop thread when a message arrives.
200 200
201 201 Subclasses should override this method to handle incoming messages.
202 202 It is important to remember that this method is called in the thread
203 203 so that some logic must be done to ensure that the application level
204 204 handlers are called in the application thread.
205 205 """
206 206 # Emit the generic signal.
207 207 self.message_received.emit(msg)
208 208
209
210 class QtIOPubChannel(QtZMQSocketChannel):
211 """The iopub channel which listens for messages that the kernel publishes.
212
213 This channel is where all output is published to frontends.
214 """
215
216 209 def flush(self, timeout=1.0):
217 """Immediately processes all pending messages on the iopub channel.
210 """Immediately processes all pending messages on this channel.
211
212 This is only used for the IOPub channel.
218 213
219 214 Callers should use this method to ensure that :meth:`call_handlers`
220 215 has been called for all messages that have been received on the
221 216 0MQ SUB socket of this channel.
222 217
223 218 This method is thread safe.
224 219
225 220 Parameters
226 221 ----------
227 222 timeout : float, optional
228 223 The maximum amount of time to spend flushing, in seconds. The
229 224 default is one second.
230 225 """
231 226 # We do the IOLoop callback process twice to ensure that the IOLoop
232 227 # gets to perform at least one full poll.
233 228 stop_time = time.time() + timeout
234 229 for i in range(2):
235 230 self._flushed = False
236 231 self.ioloop.add_callback(self._flush)
237 232 while not self._flushed and time.time() < stop_time:
238 233 time.sleep(0.01)
239 234
240 235 def _flush(self):
241 236 """Callback for :method:`self.flush`."""
242 237 self.stream.flush()
243 238 self._flushed = True
244 239
245 IOPubChannelABC.register(QtIOPubChannel)
246
247 240
248 241 class QtKernelClient(QtKernelClientMixin, KernelClient):
249 242 """ A KernelClient that provides signals and slots.
250 243 """
251 244 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
252 245 if shell:
253 246 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
254 247 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
255 248
256 249 def _check_kernel_info_reply(self, msg):
257 250 if msg['msg_type'] == 'kernel_info_reply':
258 251 self._handle_kernel_info_reply(msg)
259 252 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
260 253
261 iopub_channel_class = Type(QtIOPubChannel)
254 iopub_channel_class = Type(QtZMQSocketChannel)
262 255 shell_channel_class = Type(QtZMQSocketChannel)
263 256 stdin_channel_class = Type(QtZMQSocketChannel)
264 257 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now