##// END OF EJS Templates
Fix attribute naming
Thomas Kluyver -
Show More
@@ -1,270 +1,270 b''
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type, Instance
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
21 21
22 22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 23 from .util import SuperQObject
24 24
25 25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 26 pass
27 27
28 28 from IPython.core.release import kernel_protocol_version_info
29 29
30 30 from IPython.kernel.channelsabc import (
31 31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 32 )
33 33 from IPython.utils.py3compat import string_types, iteritems
34 34
35 35 major_protocol_version = kernel_protocol_version_info[0]
36 36
37 37 class InvalidPortNumber(Exception):
38 38 pass
39 39
40 40 # some utilities to validate message structure, these might get moved elsewhere
41 41 # if they prove to have more generic utility
42 42
43 43
44 44 def validate_string_dict(dct):
45 45 """Validate that the input is a dict with string keys and values.
46 46
47 47 Raises ValueError if not."""
48 48 for k,v in iteritems(dct):
49 49 if not isinstance(k, string_types):
50 50 raise ValueError('key %r in dict must be a string' % k)
51 51 if not isinstance(v, string_types):
52 52 raise ValueError('value %r in dict must be a string' % v)
53 53
54 54
55 55
56 56 class QtZMQSocketChannel(SuperQObject):
57 57 """A ZMQ socket emitting a Qt signal when a message is received."""
58 58 session = None
59 59 socket = None
60 60 ioloop = None
61 61 stream = None
62 62
63 63 message_received = QtCore.Signal(object)
64 64
65 65 #---------------------------------------------------------------------------
66 66 # InProcessChannel interface
67 67 #---------------------------------------------------------------------------
68 68
69 69 def call_handlers_later(self, *args, **kwds):
70 70 """ Call the message handlers later.
71 71 """
72 72 do_later = lambda: self.call_handlers(*args, **kwds)
73 73 QtCore.QTimer.singleShot(0, do_later)
74 74
75 75 def process_events(self):
76 76 """ Process any pending GUI events.
77 77 """
78 78 QtCore.QCoreApplication.instance().processEvents()
79 79
80 80 def __init__(self, socket, session, loop):
81 81 """Create a channel.
82 82
83 83 Parameters
84 84 ----------
85 85 socket : :class:`zmq.Socket`
86 86 The ZMQ socket to use.
87 87 session : :class:`session.Session`
88 88 The session to use.
89 89 loop
90 90 A pyzmq ioloop to connect the socket to using a ZMQStream
91 91 """
92 92 super(QtZMQSocketChannel, self).__init__()
93 93
94 94 self.socket = socket
95 95 self.session = session
96 96 self.ioloop = loop
97 97
98 98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
99 99 self.stream.on_recv(self._handle_recv)
100 100
101 101 _is_alive = False
102 102 def is_alive(self):
103 103 return self._is_alive
104 104
105 105 def start(self):
106 106 self._is_alive = True
107 107
108 108 def stop(self):
109 109 self._is_alive = False
110 110
111 111 def close(self):
112 112 if self.socket is not None:
113 113 try:
114 114 self.socket.close(linger=0)
115 115 except Exception:
116 116 pass
117 117 self.socket = None
118 118
119 119 def _queue_send(self, msg):
120 120 """Queue a message to be sent from the IOLoop's thread.
121 121
122 122 Parameters
123 123 ----------
124 124 msg : message to send
125 125
126 126 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
127 127 thread control of the action.
128 128 """
129 129 def thread_send():
130 130 self.session.send(self.stream, msg)
131 131 self.ioloop.add_callback(thread_send)
132 132
133 133 def _handle_recv(self, msg):
134 134 """Callback for stream.on_recv.
135 135
136 136 Unpacks message, and calls handlers with it.
137 137 """
138 138 ident,smsg = self.session.feed_identities(msg)
139 139 msg = self.session.deserialize(smsg)
140 140 self.call_handlers(msg)
141 141
142 142 def call_handlers(self, msg):
143 143 """This method is called in the ioloop thread when a message arrives.
144 144
145 145 Subclasses should override this method to handle incoming messages.
146 146 It is important to remember that this method is called in the thread
147 147 so that some logic must be done to ensure that the application level
148 148 handlers are called in the application thread.
149 149 """
150 150 # Emit the generic signal.
151 151 self.message_received.emit(msg)
152 152
153 153 def flush(self, timeout=1.0):
154 154 """Immediately processes all pending messages on this channel.
155 155
156 156 This is only used for the IOPub channel.
157 157
158 158 Callers should use this method to ensure that :meth:`call_handlers`
159 159 has been called for all messages that have been received on the
160 160 0MQ SUB socket of this channel.
161 161
162 162 This method is thread safe.
163 163
164 164 Parameters
165 165 ----------
166 166 timeout : float, optional
167 167 The maximum amount of time to spend flushing, in seconds. The
168 168 default is one second.
169 169 """
170 170 # We do the IOLoop callback process twice to ensure that the IOLoop
171 171 # gets to perform at least one full poll.
172 172 stop_time = time.time() + timeout
173 173 for i in range(2):
174 174 self._flushed = False
175 175 self.ioloop.add_callback(self._flush)
176 176 while not self._flushed and time.time() < stop_time:
177 177 time.sleep(0.01)
178 178
179 179 def _flush(self):
180 180 """Callback for :method:`self.flush`."""
181 181 self.stream.flush()
182 182 self._flushed = True
183 183
184 184
185 185 class IOLoopThread(Thread):
186 186 """Run a pyzmq ioloop in a thread to send and receive messages
187 187 """
188 188 def __init__(self, loop):
189 189 super(IOLoopThread, self).__init__()
190 190 self.daemon = True
191 191 atexit.register(self._notice_exit)
192 192 self.ioloop = loop or ioloop.IOLoop()
193 193
194 194 def _notice_exit(self):
195 195 self._exiting = True
196 196
197 197 def run(self):
198 198 """Run my loop, ignoring EINTR events in the poller"""
199 199 while True:
200 200 try:
201 201 self.ioloop.start()
202 202 except ZMQError as e:
203 203 if e.errno == errno.EINTR:
204 204 continue
205 205 else:
206 206 raise
207 207 except Exception:
208 208 if self._exiting:
209 209 break
210 210 else:
211 211 raise
212 212 else:
213 213 break
214 214
215 215 def stop(self):
216 216 """Stop the channel's event loop and join its thread.
217 217
218 218 This calls :meth:`~threading.Thread.join` and returns when the thread
219 219 terminates. :class:`RuntimeError` will be raised if
220 220 :meth:`~threading.Thread.start` is called again.
221 221 """
222 222 if self.ioloop is not None:
223 223 self.ioloop.stop()
224 224 self.join()
225 225 self.close()
226 226
227 227 def close(self):
228 228 if self.ioloop is not None:
229 229 try:
230 230 self.ioloop.close(all_fds=True)
231 231 except Exception:
232 232 pass
233 233
234 234
235 235 class QtKernelClient(QtKernelClientMixin, KernelClient):
236 236 """ A KernelClient that provides signals and slots.
237 237 """
238 238
239 239 _ioloop = None
240 240 @property
241 241 def ioloop(self):
242 242 if self._ioloop is None:
243 243 self._ioloop = ioloop.IOLoop()
244 244 return self._ioloop
245 245
246 246 ioloop_thread = Instance(IOLoopThread)
247 247
248 248 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
249 249 if shell:
250 250 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
251 251
252 self.channel_listener_thread = IOLoopThread(self.ioloop)
253 self.channel_listener_thread.start()
252 self.ioloop_thread = IOLoopThread(self.ioloop)
253 self.ioloop_thread.start()
254 254
255 255 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
256 256
257 257 def _check_kernel_info_reply(self, msg):
258 258 if msg['msg_type'] == 'kernel_info_reply':
259 259 self._handle_kernel_info_reply(msg)
260 260 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
261 261
262 262 def stop_channels(self):
263 263 super(QtKernelClient, self).stop_channels()
264 264 if self.ioloop_thread.is_alive():
265 265 self.ioloop_thread.stop()
266 266
267 267 iopub_channel_class = Type(QtZMQSocketChannel)
268 268 shell_channel_class = Type(QtZMQSocketChannel)
269 269 stdin_channel_class = Type(QtZMQSocketChannel)
270 270 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now