##// END OF EJS Templates
Fix attribute naming
Thomas Kluyver -
Show More
@@ -1,270 +1,270 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type, Instance
17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
21
21
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 from .util import SuperQObject
23 from .util import SuperQObject
24
24
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 pass
26 pass
27
27
28 from IPython.core.release import kernel_protocol_version_info
28 from IPython.core.release import kernel_protocol_version_info
29
29
30 from IPython.kernel.channelsabc import (
30 from IPython.kernel.channelsabc import (
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 )
32 )
33 from IPython.utils.py3compat import string_types, iteritems
33 from IPython.utils.py3compat import string_types, iteritems
34
34
35 major_protocol_version = kernel_protocol_version_info[0]
35 major_protocol_version = kernel_protocol_version_info[0]
36
36
37 class InvalidPortNumber(Exception):
37 class InvalidPortNumber(Exception):
38 pass
38 pass
39
39
40 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
41 # if they prove to have more generic utility
42
42
43
43
44 def validate_string_dict(dct):
44 def validate_string_dict(dct):
45 """Validate that the input is a dict with string keys and values.
45 """Validate that the input is a dict with string keys and values.
46
46
47 Raises ValueError if not."""
47 Raises ValueError if not."""
48 for k,v in iteritems(dct):
48 for k,v in iteritems(dct):
49 if not isinstance(k, string_types):
49 if not isinstance(k, string_types):
50 raise ValueError('key %r in dict must be a string' % k)
50 raise ValueError('key %r in dict must be a string' % k)
51 if not isinstance(v, string_types):
51 if not isinstance(v, string_types):
52 raise ValueError('value %r in dict must be a string' % v)
52 raise ValueError('value %r in dict must be a string' % v)
53
53
54
54
55
55
56 class QtZMQSocketChannel(SuperQObject):
56 class QtZMQSocketChannel(SuperQObject):
57 """A ZMQ socket emitting a Qt signal when a message is received."""
57 """A ZMQ socket emitting a Qt signal when a message is received."""
58 session = None
58 session = None
59 socket = None
59 socket = None
60 ioloop = None
60 ioloop = None
61 stream = None
61 stream = None
62
62
63 message_received = QtCore.Signal(object)
63 message_received = QtCore.Signal(object)
64
64
65 #---------------------------------------------------------------------------
65 #---------------------------------------------------------------------------
66 # InProcessChannel interface
66 # InProcessChannel interface
67 #---------------------------------------------------------------------------
67 #---------------------------------------------------------------------------
68
68
69 def call_handlers_later(self, *args, **kwds):
69 def call_handlers_later(self, *args, **kwds):
70 """ Call the message handlers later.
70 """ Call the message handlers later.
71 """
71 """
72 do_later = lambda: self.call_handlers(*args, **kwds)
72 do_later = lambda: self.call_handlers(*args, **kwds)
73 QtCore.QTimer.singleShot(0, do_later)
73 QtCore.QTimer.singleShot(0, do_later)
74
74
75 def process_events(self):
75 def process_events(self):
76 """ Process any pending GUI events.
76 """ Process any pending GUI events.
77 """
77 """
78 QtCore.QCoreApplication.instance().processEvents()
78 QtCore.QCoreApplication.instance().processEvents()
79
79
80 def __init__(self, socket, session, loop):
80 def __init__(self, socket, session, loop):
81 """Create a channel.
81 """Create a channel.
82
82
83 Parameters
83 Parameters
84 ----------
84 ----------
85 socket : :class:`zmq.Socket`
85 socket : :class:`zmq.Socket`
86 The ZMQ socket to use.
86 The ZMQ socket to use.
87 session : :class:`session.Session`
87 session : :class:`session.Session`
88 The session to use.
88 The session to use.
89 loop
89 loop
90 A pyzmq ioloop to connect the socket to using a ZMQStream
90 A pyzmq ioloop to connect the socket to using a ZMQStream
91 """
91 """
92 super(QtZMQSocketChannel, self).__init__()
92 super(QtZMQSocketChannel, self).__init__()
93
93
94 self.socket = socket
94 self.socket = socket
95 self.session = session
95 self.session = session
96 self.ioloop = loop
96 self.ioloop = loop
97
97
98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
99 self.stream.on_recv(self._handle_recv)
99 self.stream.on_recv(self._handle_recv)
100
100
101 _is_alive = False
101 _is_alive = False
102 def is_alive(self):
102 def is_alive(self):
103 return self._is_alive
103 return self._is_alive
104
104
105 def start(self):
105 def start(self):
106 self._is_alive = True
106 self._is_alive = True
107
107
108 def stop(self):
108 def stop(self):
109 self._is_alive = False
109 self._is_alive = False
110
110
111 def close(self):
111 def close(self):
112 if self.socket is not None:
112 if self.socket is not None:
113 try:
113 try:
114 self.socket.close(linger=0)
114 self.socket.close(linger=0)
115 except Exception:
115 except Exception:
116 pass
116 pass
117 self.socket = None
117 self.socket = None
118
118
119 def _queue_send(self, msg):
119 def _queue_send(self, msg):
120 """Queue a message to be sent from the IOLoop's thread.
120 """Queue a message to be sent from the IOLoop's thread.
121
121
122 Parameters
122 Parameters
123 ----------
123 ----------
124 msg : message to send
124 msg : message to send
125
125
126 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
126 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
127 thread control of the action.
127 thread control of the action.
128 """
128 """
129 def thread_send():
129 def thread_send():
130 self.session.send(self.stream, msg)
130 self.session.send(self.stream, msg)
131 self.ioloop.add_callback(thread_send)
131 self.ioloop.add_callback(thread_send)
132
132
133 def _handle_recv(self, msg):
133 def _handle_recv(self, msg):
134 """Callback for stream.on_recv.
134 """Callback for stream.on_recv.
135
135
136 Unpacks message, and calls handlers with it.
136 Unpacks message, and calls handlers with it.
137 """
137 """
138 ident,smsg = self.session.feed_identities(msg)
138 ident,smsg = self.session.feed_identities(msg)
139 msg = self.session.deserialize(smsg)
139 msg = self.session.deserialize(smsg)
140 self.call_handlers(msg)
140 self.call_handlers(msg)
141
141
142 def call_handlers(self, msg):
142 def call_handlers(self, msg):
143 """This method is called in the ioloop thread when a message arrives.
143 """This method is called in the ioloop thread when a message arrives.
144
144
145 Subclasses should override this method to handle incoming messages.
145 Subclasses should override this method to handle incoming messages.
146 It is important to remember that this method is called in the thread
146 It is important to remember that this method is called in the thread
147 so that some logic must be done to ensure that the application level
147 so that some logic must be done to ensure that the application level
148 handlers are called in the application thread.
148 handlers are called in the application thread.
149 """
149 """
150 # Emit the generic signal.
150 # Emit the generic signal.
151 self.message_received.emit(msg)
151 self.message_received.emit(msg)
152
152
153 def flush(self, timeout=1.0):
153 def flush(self, timeout=1.0):
154 """Immediately processes all pending messages on this channel.
154 """Immediately processes all pending messages on this channel.
155
155
156 This is only used for the IOPub channel.
156 This is only used for the IOPub channel.
157
157
158 Callers should use this method to ensure that :meth:`call_handlers`
158 Callers should use this method to ensure that :meth:`call_handlers`
159 has been called for all messages that have been received on the
159 has been called for all messages that have been received on the
160 0MQ SUB socket of this channel.
160 0MQ SUB socket of this channel.
161
161
162 This method is thread safe.
162 This method is thread safe.
163
163
164 Parameters
164 Parameters
165 ----------
165 ----------
166 timeout : float, optional
166 timeout : float, optional
167 The maximum amount of time to spend flushing, in seconds. The
167 The maximum amount of time to spend flushing, in seconds. The
168 default is one second.
168 default is one second.
169 """
169 """
170 # We do the IOLoop callback process twice to ensure that the IOLoop
170 # We do the IOLoop callback process twice to ensure that the IOLoop
171 # gets to perform at least one full poll.
171 # gets to perform at least one full poll.
172 stop_time = time.time() + timeout
172 stop_time = time.time() + timeout
173 for i in range(2):
173 for i in range(2):
174 self._flushed = False
174 self._flushed = False
175 self.ioloop.add_callback(self._flush)
175 self.ioloop.add_callback(self._flush)
176 while not self._flushed and time.time() < stop_time:
176 while not self._flushed and time.time() < stop_time:
177 time.sleep(0.01)
177 time.sleep(0.01)
178
178
179 def _flush(self):
179 def _flush(self):
180 """Callback for :method:`self.flush`."""
180 """Callback for :method:`self.flush`."""
181 self.stream.flush()
181 self.stream.flush()
182 self._flushed = True
182 self._flushed = True
183
183
184
184
185 class IOLoopThread(Thread):
185 class IOLoopThread(Thread):
186 """Run a pyzmq ioloop in a thread to send and receive messages
186 """Run a pyzmq ioloop in a thread to send and receive messages
187 """
187 """
188 def __init__(self, loop):
188 def __init__(self, loop):
189 super(IOLoopThread, self).__init__()
189 super(IOLoopThread, self).__init__()
190 self.daemon = True
190 self.daemon = True
191 atexit.register(self._notice_exit)
191 atexit.register(self._notice_exit)
192 self.ioloop = loop or ioloop.IOLoop()
192 self.ioloop = loop or ioloop.IOLoop()
193
193
194 def _notice_exit(self):
194 def _notice_exit(self):
195 self._exiting = True
195 self._exiting = True
196
196
197 def run(self):
197 def run(self):
198 """Run my loop, ignoring EINTR events in the poller"""
198 """Run my loop, ignoring EINTR events in the poller"""
199 while True:
199 while True:
200 try:
200 try:
201 self.ioloop.start()
201 self.ioloop.start()
202 except ZMQError as e:
202 except ZMQError as e:
203 if e.errno == errno.EINTR:
203 if e.errno == errno.EINTR:
204 continue
204 continue
205 else:
205 else:
206 raise
206 raise
207 except Exception:
207 except Exception:
208 if self._exiting:
208 if self._exiting:
209 break
209 break
210 else:
210 else:
211 raise
211 raise
212 else:
212 else:
213 break
213 break
214
214
215 def stop(self):
215 def stop(self):
216 """Stop the channel's event loop and join its thread.
216 """Stop the channel's event loop and join its thread.
217
217
218 This calls :meth:`~threading.Thread.join` and returns when the thread
218 This calls :meth:`~threading.Thread.join` and returns when the thread
219 terminates. :class:`RuntimeError` will be raised if
219 terminates. :class:`RuntimeError` will be raised if
220 :meth:`~threading.Thread.start` is called again.
220 :meth:`~threading.Thread.start` is called again.
221 """
221 """
222 if self.ioloop is not None:
222 if self.ioloop is not None:
223 self.ioloop.stop()
223 self.ioloop.stop()
224 self.join()
224 self.join()
225 self.close()
225 self.close()
226
226
227 def close(self):
227 def close(self):
228 if self.ioloop is not None:
228 if self.ioloop is not None:
229 try:
229 try:
230 self.ioloop.close(all_fds=True)
230 self.ioloop.close(all_fds=True)
231 except Exception:
231 except Exception:
232 pass
232 pass
233
233
234
234
235 class QtKernelClient(QtKernelClientMixin, KernelClient):
235 class QtKernelClient(QtKernelClientMixin, KernelClient):
236 """ A KernelClient that provides signals and slots.
236 """ A KernelClient that provides signals and slots.
237 """
237 """
238
238
239 _ioloop = None
239 _ioloop = None
240 @property
240 @property
241 def ioloop(self):
241 def ioloop(self):
242 if self._ioloop is None:
242 if self._ioloop is None:
243 self._ioloop = ioloop.IOLoop()
243 self._ioloop = ioloop.IOLoop()
244 return self._ioloop
244 return self._ioloop
245
245
246 ioloop_thread = Instance(IOLoopThread)
246 ioloop_thread = Instance(IOLoopThread)
247
247
248 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
248 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
249 if shell:
249 if shell:
250 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
250 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
251
251
252 self.channel_listener_thread = IOLoopThread(self.ioloop)
252 self.ioloop_thread = IOLoopThread(self.ioloop)
253 self.channel_listener_thread.start()
253 self.ioloop_thread.start()
254
254
255 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
255 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
256
256
257 def _check_kernel_info_reply(self, msg):
257 def _check_kernel_info_reply(self, msg):
258 if msg['msg_type'] == 'kernel_info_reply':
258 if msg['msg_type'] == 'kernel_info_reply':
259 self._handle_kernel_info_reply(msg)
259 self._handle_kernel_info_reply(msg)
260 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
260 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
261
261
262 def stop_channels(self):
262 def stop_channels(self):
263 super(QtKernelClient, self).stop_channels()
263 super(QtKernelClient, self).stop_channels()
264 if self.ioloop_thread.is_alive():
264 if self.ioloop_thread.is_alive():
265 self.ioloop_thread.stop()
265 self.ioloop_thread.stop()
266
266
267 iopub_channel_class = Type(QtZMQSocketChannel)
267 iopub_channel_class = Type(QtZMQSocketChannel)
268 shell_channel_class = Type(QtZMQSocketChannel)
268 shell_channel_class = Type(QtZMQSocketChannel)
269 stdin_channel_class = Type(QtZMQSocketChannel)
269 stdin_channel_class = Type(QtZMQSocketChannel)
270 hb_channel_class = Type(QtHBChannel)
270 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now