##// END OF EJS Templates
Collapse ChannelQObject class into QtInProcessChannel
Thomas Kluyver -
Show More
@@ -1,280 +1,270 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type, Instance
17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
21
21
22 from .kernel_mixins import QtKernelClientMixin
22 from .kernel_mixins import QtKernelClientMixin
23 from .util import SuperQObject
23 from .util import SuperQObject
24
24
25 class QtHBChannel(SuperQObject, HBChannel):
25 class QtHBChannel(SuperQObject, HBChannel):
26 # A longer timeout than the base class
26 # A longer timeout than the base class
27 time_to_dead = 3.0
27 time_to_dead = 3.0
28
28
29 # Emitted when the kernel has died.
29 # Emitted when the kernel has died.
30 kernel_died = QtCore.Signal(object)
30 kernel_died = QtCore.Signal(object)
31
31
32 def call_handlers(self, since_last_heartbeat):
32 def call_handlers(self, since_last_heartbeat):
33 """ Reimplemented to emit signals instead of making callbacks.
33 """ Reimplemented to emit signals instead of making callbacks.
34 """
34 """
35 # Emit the generic signal.
35 # Emit the generic signal.
36 self.kernel_died.emit(since_last_heartbeat)
36 self.kernel_died.emit(since_last_heartbeat)
37
37
38 from IPython.core.release import kernel_protocol_version_info
38 from IPython.core.release import kernel_protocol_version_info
39
39
40 from IPython.kernel.channelsabc import (
40 from IPython.kernel.channelsabc import (
41 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
41 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
42 )
42 )
43 from IPython.utils.py3compat import string_types, iteritems
43 from IPython.utils.py3compat import string_types, iteritems
44
44
45 major_protocol_version = kernel_protocol_version_info[0]
45 major_protocol_version = kernel_protocol_version_info[0]
46
46
47 class InvalidPortNumber(Exception):
47 class InvalidPortNumber(Exception):
48 pass
48 pass
49
49
50 # some utilities to validate message structure, these might get moved elsewhere
50 # some utilities to validate message structure, these might get moved elsewhere
51 # if they prove to have more generic utility
51 # if they prove to have more generic utility
52
52
53
53
54 def validate_string_dict(dct):
54 def validate_string_dict(dct):
55 """Validate that the input is a dict with string keys and values.
55 """Validate that the input is a dict with string keys and values.
56
56
57 Raises ValueError if not."""
57 Raises ValueError if not."""
58 for k,v in iteritems(dct):
58 for k,v in iteritems(dct):
59 if not isinstance(k, string_types):
59 if not isinstance(k, string_types):
60 raise ValueError('key %r in dict must be a string' % k)
60 raise ValueError('key %r in dict must be a string' % k)
61 if not isinstance(v, string_types):
61 if not isinstance(v, string_types):
62 raise ValueError('value %r in dict must be a string' % v)
62 raise ValueError('value %r in dict must be a string' % v)
63
63
64
64
65
65
66 class QtZMQSocketChannel(SuperQObject):
66 class QtZMQSocketChannel(SuperQObject):
67 """A ZMQ socket emitting a Qt signal when a message is received."""
67 """A ZMQ socket emitting a Qt signal when a message is received."""
68 session = None
68 session = None
69 socket = None
69 socket = None
70 ioloop = None
70 ioloop = None
71 stream = None
71 stream = None
72
72
73 message_received = QtCore.Signal(object)
73 message_received = QtCore.Signal(object)
74
74
75 #---------------------------------------------------------------------------
76 # InProcessChannel interface
77 #---------------------------------------------------------------------------
78
79 def call_handlers_later(self, *args, **kwds):
80 """ Call the message handlers later.
81 """
82 do_later = lambda: self.call_handlers(*args, **kwds)
83 QtCore.QTimer.singleShot(0, do_later)
84
85 def process_events(self):
75 def process_events(self):
86 """ Process any pending GUI events.
76 """ Process any pending GUI events.
87 """
77 """
88 QtCore.QCoreApplication.instance().processEvents()
78 QtCore.QCoreApplication.instance().processEvents()
89
79
90 def __init__(self, socket, session, loop):
80 def __init__(self, socket, session, loop):
91 """Create a channel.
81 """Create a channel.
92
82
93 Parameters
83 Parameters
94 ----------
84 ----------
95 socket : :class:`zmq.Socket`
85 socket : :class:`zmq.Socket`
96 The ZMQ socket to use.
86 The ZMQ socket to use.
97 session : :class:`session.Session`
87 session : :class:`session.Session`
98 The session to use.
88 The session to use.
99 loop
89 loop
100 A pyzmq ioloop to connect the socket to using a ZMQStream
90 A pyzmq ioloop to connect the socket to using a ZMQStream
101 """
91 """
102 super(QtZMQSocketChannel, self).__init__()
92 super(QtZMQSocketChannel, self).__init__()
103
93
104 self.socket = socket
94 self.socket = socket
105 self.session = session
95 self.session = session
106 self.ioloop = loop
96 self.ioloop = loop
107
97
108 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
109 self.stream.on_recv(self._handle_recv)
99 self.stream.on_recv(self._handle_recv)
110
100
111 _is_alive = False
101 _is_alive = False
112 def is_alive(self):
102 def is_alive(self):
113 return self._is_alive
103 return self._is_alive
114
104
115 def start(self):
105 def start(self):
116 self._is_alive = True
106 self._is_alive = True
117
107
118 def stop(self):
108 def stop(self):
119 self._is_alive = False
109 self._is_alive = False
120
110
121 def close(self):
111 def close(self):
122 if self.socket is not None:
112 if self.socket is not None:
123 try:
113 try:
124 self.socket.close(linger=0)
114 self.socket.close(linger=0)
125 except Exception:
115 except Exception:
126 pass
116 pass
127 self.socket = None
117 self.socket = None
128
118
129 def _queue_send(self, msg):
119 def _queue_send(self, msg):
130 """Queue a message to be sent from the IOLoop's thread.
120 """Queue a message to be sent from the IOLoop's thread.
131
121
132 Parameters
122 Parameters
133 ----------
123 ----------
134 msg : message to send
124 msg : message to send
135
125
136 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
126 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
137 thread control of the action.
127 thread control of the action.
138 """
128 """
139 def thread_send():
129 def thread_send():
140 self.session.send(self.stream, msg)
130 self.session.send(self.stream, msg)
141 self.ioloop.add_callback(thread_send)
131 self.ioloop.add_callback(thread_send)
142
132
143 def _handle_recv(self, msg):
133 def _handle_recv(self, msg):
144 """Callback for stream.on_recv.
134 """Callback for stream.on_recv.
145
135
146 Unpacks message, and calls handlers with it.
136 Unpacks message, and calls handlers with it.
147 """
137 """
148 ident,smsg = self.session.feed_identities(msg)
138 ident,smsg = self.session.feed_identities(msg)
149 msg = self.session.deserialize(smsg)
139 msg = self.session.deserialize(smsg)
150 self.call_handlers(msg)
140 self.call_handlers(msg)
151
141
152 def call_handlers(self, msg):
142 def call_handlers(self, msg):
153 """This method is called in the ioloop thread when a message arrives.
143 """This method is called in the ioloop thread when a message arrives.
154
144
155 Subclasses should override this method to handle incoming messages.
145 Subclasses should override this method to handle incoming messages.
156 It is important to remember that this method is called in the thread
146 It is important to remember that this method is called in the thread
157 so that some logic must be done to ensure that the application level
147 so that some logic must be done to ensure that the application level
158 handlers are called in the application thread.
148 handlers are called in the application thread.
159 """
149 """
160 # Emit the generic signal.
150 # Emit the generic signal.
161 self.message_received.emit(msg)
151 self.message_received.emit(msg)
162
152
163 def flush(self, timeout=1.0):
153 def flush(self, timeout=1.0):
164 """Immediately processes all pending messages on this channel.
154 """Immediately processes all pending messages on this channel.
165
155
166 This is only used for the IOPub channel.
156 This is only used for the IOPub channel.
167
157
168 Callers should use this method to ensure that :meth:`call_handlers`
158 Callers should use this method to ensure that :meth:`call_handlers`
169 has been called for all messages that have been received on the
159 has been called for all messages that have been received on the
170 0MQ SUB socket of this channel.
160 0MQ SUB socket of this channel.
171
161
172 This method is thread safe.
162 This method is thread safe.
173
163
174 Parameters
164 Parameters
175 ----------
165 ----------
176 timeout : float, optional
166 timeout : float, optional
177 The maximum amount of time to spend flushing, in seconds. The
167 The maximum amount of time to spend flushing, in seconds. The
178 default is one second.
168 default is one second.
179 """
169 """
180 # We do the IOLoop callback process twice to ensure that the IOLoop
170 # We do the IOLoop callback process twice to ensure that the IOLoop
181 # gets to perform at least one full poll.
171 # gets to perform at least one full poll.
182 stop_time = time.time() + timeout
172 stop_time = time.time() + timeout
183 for i in range(2):
173 for i in range(2):
184 self._flushed = False
174 self._flushed = False
185 self.ioloop.add_callback(self._flush)
175 self.ioloop.add_callback(self._flush)
186 while not self._flushed and time.time() < stop_time:
176 while not self._flushed and time.time() < stop_time:
187 time.sleep(0.01)
177 time.sleep(0.01)
188
178
189 def _flush(self):
179 def _flush(self):
190 """Callback for :method:`self.flush`."""
180 """Callback for :method:`self.flush`."""
191 self.stream.flush()
181 self.stream.flush()
192 self._flushed = True
182 self._flushed = True
193
183
194
184
195 class IOLoopThread(Thread):
185 class IOLoopThread(Thread):
196 """Run a pyzmq ioloop in a thread to send and receive messages
186 """Run a pyzmq ioloop in a thread to send and receive messages
197 """
187 """
198 def __init__(self, loop):
188 def __init__(self, loop):
199 super(IOLoopThread, self).__init__()
189 super(IOLoopThread, self).__init__()
200 self.daemon = True
190 self.daemon = True
201 atexit.register(self._notice_exit)
191 atexit.register(self._notice_exit)
202 self.ioloop = loop or ioloop.IOLoop()
192 self.ioloop = loop or ioloop.IOLoop()
203
193
204 def _notice_exit(self):
194 def _notice_exit(self):
205 self._exiting = True
195 self._exiting = True
206
196
207 def run(self):
197 def run(self):
208 """Run my loop, ignoring EINTR events in the poller"""
198 """Run my loop, ignoring EINTR events in the poller"""
209 while True:
199 while True:
210 try:
200 try:
211 self.ioloop.start()
201 self.ioloop.start()
212 except ZMQError as e:
202 except ZMQError as e:
213 if e.errno == errno.EINTR:
203 if e.errno == errno.EINTR:
214 continue
204 continue
215 else:
205 else:
216 raise
206 raise
217 except Exception:
207 except Exception:
218 if self._exiting:
208 if self._exiting:
219 break
209 break
220 else:
210 else:
221 raise
211 raise
222 else:
212 else:
223 break
213 break
224
214
225 def stop(self):
215 def stop(self):
226 """Stop the channel's event loop and join its thread.
216 """Stop the channel's event loop and join its thread.
227
217
228 This calls :meth:`~threading.Thread.join` and returns when the thread
218 This calls :meth:`~threading.Thread.join` and returns when the thread
229 terminates. :class:`RuntimeError` will be raised if
219 terminates. :class:`RuntimeError` will be raised if
230 :meth:`~threading.Thread.start` is called again.
220 :meth:`~threading.Thread.start` is called again.
231 """
221 """
232 if self.ioloop is not None:
222 if self.ioloop is not None:
233 self.ioloop.stop()
223 self.ioloop.stop()
234 self.join()
224 self.join()
235 self.close()
225 self.close()
236
226
237 def close(self):
227 def close(self):
238 if self.ioloop is not None:
228 if self.ioloop is not None:
239 try:
229 try:
240 self.ioloop.close(all_fds=True)
230 self.ioloop.close(all_fds=True)
241 except Exception:
231 except Exception:
242 pass
232 pass
243
233
244
234
245 class QtKernelClient(QtKernelClientMixin, KernelClient):
235 class QtKernelClient(QtKernelClientMixin, KernelClient):
246 """ A KernelClient that provides signals and slots.
236 """ A KernelClient that provides signals and slots.
247 """
237 """
248
238
249 _ioloop = None
239 _ioloop = None
250 @property
240 @property
251 def ioloop(self):
241 def ioloop(self):
252 if self._ioloop is None:
242 if self._ioloop is None:
253 self._ioloop = ioloop.IOLoop()
243 self._ioloop = ioloop.IOLoop()
254 return self._ioloop
244 return self._ioloop
255
245
256 ioloop_thread = Instance(IOLoopThread)
246 ioloop_thread = Instance(IOLoopThread)
257
247
258 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
248 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
259 if shell:
249 if shell:
260 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
250 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
261
251
262 self.ioloop_thread = IOLoopThread(self.ioloop)
252 self.ioloop_thread = IOLoopThread(self.ioloop)
263 self.ioloop_thread.start()
253 self.ioloop_thread.start()
264
254
265 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
255 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
266
256
267 def _check_kernel_info_reply(self, msg):
257 def _check_kernel_info_reply(self, msg):
268 if msg['msg_type'] == 'kernel_info_reply':
258 if msg['msg_type'] == 'kernel_info_reply':
269 self._handle_kernel_info_reply(msg)
259 self._handle_kernel_info_reply(msg)
270 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
260 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
271
261
272 def stop_channels(self):
262 def stop_channels(self):
273 super(QtKernelClient, self).stop_channels()
263 super(QtKernelClient, self).stop_channels()
274 if self.ioloop_thread.is_alive():
264 if self.ioloop_thread.is_alive():
275 self.ioloop_thread.stop()
265 self.ioloop_thread.stop()
276
266
277 iopub_channel_class = Type(QtZMQSocketChannel)
267 iopub_channel_class = Type(QtZMQSocketChannel)
278 shell_channel_class = Type(QtZMQSocketChannel)
268 shell_channel_class = Type(QtZMQSocketChannel)
279 stdin_channel_class = Type(QtZMQSocketChannel)
269 stdin_channel_class = Type(QtZMQSocketChannel)
280 hb_channel_class = Type(QtHBChannel)
270 hb_channel_class = Type(QtHBChannel)
@@ -1,30 +1,70 b''
1 """ Defines an in-process KernelManager with signals and slots.
1 """ Defines an in-process KernelManager with signals and slots.
2 """
2 """
3
3
4 # Local imports.
4 # Local imports.
5 from IPython.external.qt import QtCore
5 from IPython.external.qt import QtCore
6 from IPython.kernel.inprocess import (
6 from IPython.kernel.inprocess import (
7 InProcessHBChannel, InProcessKernelClient, InProcessKernelManager,
7 InProcessHBChannel, InProcessKernelClient, InProcessKernelManager,
8 )
8 )
9 from IPython.kernel.inprocess.channels import InProcessChannel
9 from IPython.kernel.inprocess.channels import InProcessChannel
10
10
11 from IPython.utils.traitlets import Type
11 from IPython.utils.traitlets import Type
12 from .kernel_mixins import ( ChannelQObject,
12 from .util import SuperQObject
13 QtKernelClientMixin,
13 from .kernel_mixins import (
14 QtKernelManagerMixin,
14 QtKernelClientMixin, QtKernelManagerMixin,
15 )
15 )
16
16
17 class QtInProcessChannel(ChannelQObject, InProcessChannel):
17 class QtInProcessChannel(SuperQObject, InProcessChannel):
18 pass
18 # Emitted when the channel is started.
19 started = QtCore.Signal()
20
21 # Emitted when the channel is stopped.
22 stopped = QtCore.Signal()
23
24 # Emitted when any message is received.
25 message_received = QtCore.Signal(object)
26
27 def start(self):
28 """ Reimplemented to emit signal.
29 """
30 super(QtInProcessChannel, self).start()
31 self.started.emit()
32
33 def stop(self):
34 """ Reimplemented to emit signal.
35 """
36 super(QtInProcessChannel, self).stop()
37 self.stopped.emit()
38
39 def call_handlers_later(self, *args, **kwds):
40 """ Call the message handlers later.
41 """
42 do_later = lambda: self.call_handlers(*args, **kwds)
43 QtCore.QTimer.singleShot(0, do_later)
44
45 def call_handlers(self, msg):
46 self.message_received.emit(msg)
47
48 def process_events(self):
49 """ Process any pending GUI events.
50 """
51 QtCore.QCoreApplication.instance().processEvents()
52
53 def flush(self, timeout=1.0):
54 """ Reimplemented to ensure that signals are dispatched immediately.
55 """
56 super(QtInProcessChannel, self).flush()
57 self.process_events()
58
19
59
20 class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient):
60 class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient):
21 """ An in-process KernelManager with signals and slots.
61 """ An in-process KernelManager with signals and slots.
22 """
62 """
23
63
24 iopub_channel_class = Type(QtInProcessChannel)
64 iopub_channel_class = Type(QtInProcessChannel)
25 shell_channel_class = Type(QtInProcessChannel)
65 shell_channel_class = Type(QtInProcessChannel)
26 stdin_channel_class = Type(QtInProcessChannel)
66 stdin_channel_class = Type(QtInProcessChannel)
27 hb_channel_class = Type(InProcessHBChannel)
67 hb_channel_class = Type(InProcessHBChannel)
28
68
29 class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):
69 class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):
30 client_class = __module__ + '.QtInProcessKernelClient'
70 client_class = __module__ + '.QtInProcessKernelClient'
@@ -1,94 +1,50 b''
1 """Defines a KernelManager that provides signals and slots."""
1 """Defines a KernelManager that provides signals and slots."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from IPython.external.qt import QtCore
6 from IPython.external.qt import QtCore
7
7
8 from IPython.utils.traitlets import HasTraits, Type
8 from IPython.utils.traitlets import HasTraits, Type
9 from .util import MetaQObjectHasTraits, SuperQObject
9 from .util import MetaQObjectHasTraits, SuperQObject
10
10
11
11
12 class ChannelQObject(SuperQObject):
13
14 # Emitted when the channel is started.
15 started = QtCore.Signal()
16
17 # Emitted when the channel is stopped.
18 stopped = QtCore.Signal()
19
20 # Emitted when any message is received.
21 message_received = QtCore.Signal(object)
22
23 def start(self):
24 """ Reimplemented to emit signal.
25 """
26 super(ChannelQObject, self).start()
27 self.started.emit()
28
29 def stop(self):
30 """ Reimplemented to emit signal.
31 """
32 super(ChannelQObject, self).stop()
33 self.stopped.emit()
34
35 def call_handlers_later(self, *args, **kwds):
36 """ Call the message handlers later.
37 """
38 do_later = lambda: self.call_handlers(*args, **kwds)
39 QtCore.QTimer.singleShot(0, do_later)
40
41 def call_handlers(self, msg):
42 self.message_received.emit(msg)
43
44 def process_events(self):
45 """ Process any pending GUI events.
46 """
47 QtCore.QCoreApplication.instance().processEvents()
48
49 def flush(self):
50 """ Reimplemented to ensure that signals are dispatched immediately.
51 """
52 super(ChannelQObject, self).flush()
53 self.process_events()
54
55
56 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
12 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
57
13
58 _timer = None
14 _timer = None
59
15
60
16
61 class QtKernelManagerMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
17 class QtKernelManagerMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
62 """ A KernelClient that provides signals and slots.
18 """ A KernelClient that provides signals and slots.
63 """
19 """
64
20
65 kernel_restarted = QtCore.Signal()
21 kernel_restarted = QtCore.Signal()
66
22
67
23
68 class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
24 class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
69 """ A KernelClient that provides signals and slots.
25 """ A KernelClient that provides signals and slots.
70 """
26 """
71
27
72 # Emitted when the kernel client has started listening.
28 # Emitted when the kernel client has started listening.
73 started_channels = QtCore.Signal()
29 started_channels = QtCore.Signal()
74
30
75 # Emitted when the kernel client has stopped listening.
31 # Emitted when the kernel client has stopped listening.
76 stopped_channels = QtCore.Signal()
32 stopped_channels = QtCore.Signal()
77
33
78 #---------------------------------------------------------------------------
34 #---------------------------------------------------------------------------
79 # 'KernelClient' interface
35 # 'KernelClient' interface
80 #---------------------------------------------------------------------------
36 #---------------------------------------------------------------------------
81
37
82 #------ Channel management -------------------------------------------------
38 #------ Channel management -------------------------------------------------
83
39
84 def start_channels(self, *args, **kw):
40 def start_channels(self, *args, **kw):
85 """ Reimplemented to emit signal.
41 """ Reimplemented to emit signal.
86 """
42 """
87 super(QtKernelClientMixin, self).start_channels(*args, **kw)
43 super(QtKernelClientMixin, self).start_channels(*args, **kw)
88 self.started_channels.emit()
44 self.started_channels.emit()
89
45
90 def stop_channels(self):
46 def stop_channels(self):
91 """ Reimplemented to emit signal.
47 """ Reimplemented to emit signal.
92 """
48 """
93 super(QtKernelClientMixin, self).stop_channels()
49 super(QtKernelClientMixin, self).stop_channels()
94 self.stopped_channels.emit()
50 self.stopped_channels.emit()
General Comments 0
You need to be logged in to leave comments. Login now