##// END OF EJS Templates
Collapse ChannelQObject class into QtInProcessChannel
Thomas Kluyver -
Show More
@@ -1,280 +1,270 b''
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type, Instance
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
21 21
22 22 from .kernel_mixins import QtKernelClientMixin
23 23 from .util import SuperQObject
24 24
25 25 class QtHBChannel(SuperQObject, HBChannel):
26 26 # A longer timeout than the base class
27 27 time_to_dead = 3.0
28 28
29 29 # Emitted when the kernel has died.
30 30 kernel_died = QtCore.Signal(object)
31 31
32 32 def call_handlers(self, since_last_heartbeat):
33 33 """ Reimplemented to emit signals instead of making callbacks.
34 34 """
35 35 # Emit the generic signal.
36 36 self.kernel_died.emit(since_last_heartbeat)
37 37
38 38 from IPython.core.release import kernel_protocol_version_info
39 39
40 40 from IPython.kernel.channelsabc import (
41 41 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
42 42 )
43 43 from IPython.utils.py3compat import string_types, iteritems
44 44
45 45 major_protocol_version = kernel_protocol_version_info[0]
46 46
47 47 class InvalidPortNumber(Exception):
48 48 pass
49 49
50 50 # some utilities to validate message structure, these might get moved elsewhere
51 51 # if they prove to have more generic utility
52 52
53 53
54 54 def validate_string_dict(dct):
55 55 """Validate that the input is a dict with string keys and values.
56 56
57 57 Raises ValueError if not."""
58 58 for k,v in iteritems(dct):
59 59 if not isinstance(k, string_types):
60 60 raise ValueError('key %r in dict must be a string' % k)
61 61 if not isinstance(v, string_types):
62 62 raise ValueError('value %r in dict must be a string' % v)
63 63
64 64
65 65
66 66 class QtZMQSocketChannel(SuperQObject):
67 67 """A ZMQ socket emitting a Qt signal when a message is received."""
68 68 session = None
69 69 socket = None
70 70 ioloop = None
71 71 stream = None
72 72
73 73 message_received = QtCore.Signal(object)
74 74
75 #---------------------------------------------------------------------------
76 # InProcessChannel interface
77 #---------------------------------------------------------------------------
78
79 def call_handlers_later(self, *args, **kwds):
80 """ Call the message handlers later.
81 """
82 do_later = lambda: self.call_handlers(*args, **kwds)
83 QtCore.QTimer.singleShot(0, do_later)
84
85 75 def process_events(self):
86 76 """ Process any pending GUI events.
87 77 """
88 78 QtCore.QCoreApplication.instance().processEvents()
89 79
90 80 def __init__(self, socket, session, loop):
91 81 """Create a channel.
92 82
93 83 Parameters
94 84 ----------
95 85 socket : :class:`zmq.Socket`
96 86 The ZMQ socket to use.
97 87 session : :class:`session.Session`
98 88 The session to use.
99 89 loop
100 90 A pyzmq ioloop to connect the socket to using a ZMQStream
101 91 """
102 92 super(QtZMQSocketChannel, self).__init__()
103 93
104 94 self.socket = socket
105 95 self.session = session
106 96 self.ioloop = loop
107 97
108 98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
109 99 self.stream.on_recv(self._handle_recv)
110 100
111 101 _is_alive = False
112 102 def is_alive(self):
113 103 return self._is_alive
114 104
115 105 def start(self):
116 106 self._is_alive = True
117 107
118 108 def stop(self):
119 109 self._is_alive = False
120 110
121 111 def close(self):
122 112 if self.socket is not None:
123 113 try:
124 114 self.socket.close(linger=0)
125 115 except Exception:
126 116 pass
127 117 self.socket = None
128 118
129 119 def _queue_send(self, msg):
130 120 """Queue a message to be sent from the IOLoop's thread.
131 121
132 122 Parameters
133 123 ----------
134 124 msg : message to send
135 125
136 126 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
137 127 thread control of the action.
138 128 """
139 129 def thread_send():
140 130 self.session.send(self.stream, msg)
141 131 self.ioloop.add_callback(thread_send)
142 132
143 133 def _handle_recv(self, msg):
144 134 """Callback for stream.on_recv.
145 135
146 136 Unpacks message, and calls handlers with it.
147 137 """
148 138 ident,smsg = self.session.feed_identities(msg)
149 139 msg = self.session.deserialize(smsg)
150 140 self.call_handlers(msg)
151 141
152 142 def call_handlers(self, msg):
153 143 """This method is called in the ioloop thread when a message arrives.
154 144
155 145 Subclasses should override this method to handle incoming messages.
156 146 It is important to remember that this method is called in the thread
157 147 so that some logic must be done to ensure that the application level
158 148 handlers are called in the application thread.
159 149 """
160 150 # Emit the generic signal.
161 151 self.message_received.emit(msg)
162 152
163 153 def flush(self, timeout=1.0):
164 154 """Immediately processes all pending messages on this channel.
165 155
166 156 This is only used for the IOPub channel.
167 157
168 158 Callers should use this method to ensure that :meth:`call_handlers`
169 159 has been called for all messages that have been received on the
170 160 0MQ SUB socket of this channel.
171 161
172 162 This method is thread safe.
173 163
174 164 Parameters
175 165 ----------
176 166 timeout : float, optional
177 167 The maximum amount of time to spend flushing, in seconds. The
178 168 default is one second.
179 169 """
180 170 # We do the IOLoop callback process twice to ensure that the IOLoop
181 171 # gets to perform at least one full poll.
182 172 stop_time = time.time() + timeout
183 173 for i in range(2):
184 174 self._flushed = False
185 175 self.ioloop.add_callback(self._flush)
186 176 while not self._flushed and time.time() < stop_time:
187 177 time.sleep(0.01)
188 178
189 179 def _flush(self):
190 180 """Callback for :method:`self.flush`."""
191 181 self.stream.flush()
192 182 self._flushed = True
193 183
194 184
195 185 class IOLoopThread(Thread):
196 186 """Run a pyzmq ioloop in a thread to send and receive messages
197 187 """
198 188 def __init__(self, loop):
199 189 super(IOLoopThread, self).__init__()
200 190 self.daemon = True
201 191 atexit.register(self._notice_exit)
202 192 self.ioloop = loop or ioloop.IOLoop()
203 193
204 194 def _notice_exit(self):
205 195 self._exiting = True
206 196
207 197 def run(self):
208 198 """Run my loop, ignoring EINTR events in the poller"""
209 199 while True:
210 200 try:
211 201 self.ioloop.start()
212 202 except ZMQError as e:
213 203 if e.errno == errno.EINTR:
214 204 continue
215 205 else:
216 206 raise
217 207 except Exception:
218 208 if self._exiting:
219 209 break
220 210 else:
221 211 raise
222 212 else:
223 213 break
224 214
225 215 def stop(self):
226 216 """Stop the channel's event loop and join its thread.
227 217
228 218 This calls :meth:`~threading.Thread.join` and returns when the thread
229 219 terminates. :class:`RuntimeError` will be raised if
230 220 :meth:`~threading.Thread.start` is called again.
231 221 """
232 222 if self.ioloop is not None:
233 223 self.ioloop.stop()
234 224 self.join()
235 225 self.close()
236 226
237 227 def close(self):
238 228 if self.ioloop is not None:
239 229 try:
240 230 self.ioloop.close(all_fds=True)
241 231 except Exception:
242 232 pass
243 233
244 234
245 235 class QtKernelClient(QtKernelClientMixin, KernelClient):
246 236 """ A KernelClient that provides signals and slots.
247 237 """
248 238
249 239 _ioloop = None
250 240 @property
251 241 def ioloop(self):
252 242 if self._ioloop is None:
253 243 self._ioloop = ioloop.IOLoop()
254 244 return self._ioloop
255 245
256 246 ioloop_thread = Instance(IOLoopThread)
257 247
258 248 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
259 249 if shell:
260 250 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
261 251
262 252 self.ioloop_thread = IOLoopThread(self.ioloop)
263 253 self.ioloop_thread.start()
264 254
265 255 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
266 256
267 257 def _check_kernel_info_reply(self, msg):
268 258 if msg['msg_type'] == 'kernel_info_reply':
269 259 self._handle_kernel_info_reply(msg)
270 260 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
271 261
272 262 def stop_channels(self):
273 263 super(QtKernelClient, self).stop_channels()
274 264 if self.ioloop_thread.is_alive():
275 265 self.ioloop_thread.stop()
276 266
277 267 iopub_channel_class = Type(QtZMQSocketChannel)
278 268 shell_channel_class = Type(QtZMQSocketChannel)
279 269 stdin_channel_class = Type(QtZMQSocketChannel)
280 270 hb_channel_class = Type(QtHBChannel)
@@ -1,30 +1,70 b''
1 1 """ Defines an in-process KernelManager with signals and slots.
2 2 """
3 3
4 4 # Local imports.
5 5 from IPython.external.qt import QtCore
6 6 from IPython.kernel.inprocess import (
7 7 InProcessHBChannel, InProcessKernelClient, InProcessKernelManager,
8 8 )
9 9 from IPython.kernel.inprocess.channels import InProcessChannel
10 10
11 11 from IPython.utils.traitlets import Type
12 from .kernel_mixins import ( ChannelQObject,
13 QtKernelClientMixin,
14 QtKernelManagerMixin,
12 from .util import SuperQObject
13 from .kernel_mixins import (
14 QtKernelClientMixin, QtKernelManagerMixin,
15 15 )
16 16
17 class QtInProcessChannel(ChannelQObject, InProcessChannel):
18 pass
17 class QtInProcessChannel(SuperQObject, InProcessChannel):
18 # Emitted when the channel is started.
19 started = QtCore.Signal()
20
21 # Emitted when the channel is stopped.
22 stopped = QtCore.Signal()
23
24 # Emitted when any message is received.
25 message_received = QtCore.Signal(object)
26
27 def start(self):
28 """ Reimplemented to emit signal.
29 """
30 super(QtInProcessChannel, self).start()
31 self.started.emit()
32
33 def stop(self):
34 """ Reimplemented to emit signal.
35 """
36 super(QtInProcessChannel, self).stop()
37 self.stopped.emit()
38
39 def call_handlers_later(self, *args, **kwds):
40 """ Call the message handlers later.
41 """
42 do_later = lambda: self.call_handlers(*args, **kwds)
43 QtCore.QTimer.singleShot(0, do_later)
44
45 def call_handlers(self, msg):
46 self.message_received.emit(msg)
47
48 def process_events(self):
49 """ Process any pending GUI events.
50 """
51 QtCore.QCoreApplication.instance().processEvents()
52
53 def flush(self, timeout=1.0):
54 """ Reimplemented to ensure that signals are dispatched immediately.
55 """
56 super(QtInProcessChannel, self).flush()
57 self.process_events()
58
19 59
20 60 class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient):
21 61 """ An in-process KernelManager with signals and slots.
22 62 """
23 63
24 64 iopub_channel_class = Type(QtInProcessChannel)
25 65 shell_channel_class = Type(QtInProcessChannel)
26 66 stdin_channel_class = Type(QtInProcessChannel)
27 67 hb_channel_class = Type(InProcessHBChannel)
28 68
29 69 class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):
30 70 client_class = __module__ + '.QtInProcessKernelClient'
@@ -1,94 +1,50 b''
1 1 """Defines a KernelManager that provides signals and slots."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from IPython.external.qt import QtCore
7 7
8 8 from IPython.utils.traitlets import HasTraits, Type
9 9 from .util import MetaQObjectHasTraits, SuperQObject
10 10
11 11
12 class ChannelQObject(SuperQObject):
13
14 # Emitted when the channel is started.
15 started = QtCore.Signal()
16
17 # Emitted when the channel is stopped.
18 stopped = QtCore.Signal()
19
20 # Emitted when any message is received.
21 message_received = QtCore.Signal(object)
22
23 def start(self):
24 """ Reimplemented to emit signal.
25 """
26 super(ChannelQObject, self).start()
27 self.started.emit()
28
29 def stop(self):
30 """ Reimplemented to emit signal.
31 """
32 super(ChannelQObject, self).stop()
33 self.stopped.emit()
34
35 def call_handlers_later(self, *args, **kwds):
36 """ Call the message handlers later.
37 """
38 do_later = lambda: self.call_handlers(*args, **kwds)
39 QtCore.QTimer.singleShot(0, do_later)
40
41 def call_handlers(self, msg):
42 self.message_received.emit(msg)
43
44 def process_events(self):
45 """ Process any pending GUI events.
46 """
47 QtCore.QCoreApplication.instance().processEvents()
48
49 def flush(self):
50 """ Reimplemented to ensure that signals are dispatched immediately.
51 """
52 super(ChannelQObject, self).flush()
53 self.process_events()
54
55
56 12 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
57 13
58 14 _timer = None
59 15
60 16
61 17 class QtKernelManagerMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
62 18 """ A KernelClient that provides signals and slots.
63 19 """
64 20
65 21 kernel_restarted = QtCore.Signal()
66 22
67 23
68 24 class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
69 25 """ A KernelClient that provides signals and slots.
70 26 """
71 27
72 28 # Emitted when the kernel client has started listening.
73 29 started_channels = QtCore.Signal()
74 30
75 31 # Emitted when the kernel client has stopped listening.
76 32 stopped_channels = QtCore.Signal()
77 33
78 34 #---------------------------------------------------------------------------
79 35 # 'KernelClient' interface
80 36 #---------------------------------------------------------------------------
81 37
82 38 #------ Channel management -------------------------------------------------
83 39
84 40 def start_channels(self, *args, **kw):
85 41 """ Reimplemented to emit signal.
86 42 """
87 43 super(QtKernelClientMixin, self).start_channels(*args, **kw)
88 44 self.started_channels.emit()
89 45
90 46 def stop_channels(self):
91 47 """ Reimplemented to emit signal.
92 48 """
93 49 super(QtKernelClientMixin, self).stop_channels()
94 50 self.stopped_channels.emit()
General Comments 0
You need to be logged in to leave comments. Login now