##// END OF EJS Templates
Share an IOLoop among Qt channels, rather than one each
Thomas Kluyver -
Show More
@@ -51,17 +51,17 class ZMQSocketChannel(object):
51 51 _exiting = False
52 52 proxy_methods = []
53 53
54 def __init__(self, socket, session):
54 def __init__(self, socket, session, loop=None):
55 55 """Create a channel.
56 56
57 57 Parameters
58 58 ----------
59 context : :class:`zmq.Context`
60 The ZMQ context to use.
59 socket : :class:`zmq.Socket`
60 The ZMQ socket to use.
61 61 session : :class:`session.Session`
62 62 The session to use.
63 address : zmq url
64 Standard (ip, port) tuple that the kernel is listening on.
63 loop
64 Unused here, for other implementations
65 65 """
66 66 super(ZMQSocketChannel, self).__init__()
67 67
@@ -124,6 +124,8 class KernelClient(ConnectionFileMixin):
124 124 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
125 125 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
126 126
127 ioloop = None # Overridden in subclasses that use pyzmq event loop
128
127 129 @property
128 130 def shell_channel(self):
129 131 """Get the shell channel object for this kernel."""
@@ -132,7 +134,7 class KernelClient(ConnectionFileMixin):
132 134 self.log.debug("connecting shell channel to %s", url)
133 135 socket = make_shell_socket(self.context, self.session.bsession, url)
134 136 self._shell_channel = self.shell_channel_class(
135 socket, self.session
137 socket, self.session, self.ioloop
136 138 )
137 139 return self._shell_channel
138 140
@@ -144,7 +146,7 class KernelClient(ConnectionFileMixin):
144 146 self.log.debug("connecting iopub channel to %s", url)
145 147 socket = make_iopub_socket(self.context, self.session.bsession, url)
146 148 self._iopub_channel = self.iopub_channel_class(
147 socket, self.session
149 socket, self.session, self.ioloop
148 150 )
149 151 return self._iopub_channel
150 152
@@ -156,7 +158,7 class KernelClient(ConnectionFileMixin):
156 158 self.log.debug("connecting stdin channel to %s", url)
157 159 socket = make_stdin_socket(self.context, self.session.bsession, url)
158 160 self._stdin_channel = self.stdin_channel_class(
159 socket, self.session
161 socket, self.session, self.ioloop
160 162 )
161 163 return self._stdin_channel
162 164
@@ -14,7 +14,7 from zmq.eventloop import ioloop, zmqstream
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type, Instance
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
@@ -53,20 +53,12 def validate_string_dict(dct):
53 53
54 54
55 55
56 class QtZMQSocketChannel(SuperQObject, Thread):
56 class QtZMQSocketChannel(SuperQObject):
57 57 """The base class for the channels that use ZMQ sockets."""
58 58 session = None
59 59 socket = None
60 60 ioloop = None
61 61 stream = None
62 _exiting = False
63 proxy_methods = []
64
65 # Emitted when the channel is started.
66 started = QtCore.Signal()
67
68 # Emitted when the channel is stopped.
69 stopped = QtCore.Signal()
70 62
71 63 message_received = QtCore.Signal(object)
72 64
@@ -85,78 +77,38 class QtZMQSocketChannel(SuperQObject, Thread):
85 77 """
86 78 QtCore.QCoreApplication.instance().processEvents()
87 79
88 def __init__(self, socket, session):
80 def __init__(self, socket, session, loop):
89 81 """Create a channel.
90 82
91 83 Parameters
92 84 ----------
93 context : :class:`zmq.Context`
94 The ZMQ context to use.
85 socket : :class:`zmq.Socket`
86 The ZMQ socket to use.
95 87 session : :class:`session.Session`
96 88 The session to use.
97 address : zmq url
98 Standard (ip, port) tuple that the kernel is listening on.
89 loop
90 A pyzmq ioloop to connect the socket to using a ZMQStream
99 91 """
100 92 super(QtZMQSocketChannel, self).__init__()
101 self.daemon = True
102 93
103 94 self.socket = socket
104 95 self.session = session
105 atexit.register(self._notice_exit)
106 self.ioloop = ioloop.IOLoop()
96 self.ioloop = loop
107 97
108 def _notice_exit(self):
109 self._exiting = True
98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
99 self.stream.on_recv(self._handle_recv)
110 100
111 def _run_loop(self):
112 """Run my loop, ignoring EINTR events in the poller"""
113 while True:
114 try:
115 self.ioloop.start()
116 except ZMQError as e:
117 if e.errno == errno.EINTR:
118 continue
119 else:
120 raise
121 except Exception:
122 if self._exiting:
123 break
124 else:
125 raise
126 else:
127 break
101 _is_alive = False
102 def is_alive(self):
103 return self._is_alive
128 104
129 105 def start(self):
130 """ Reimplemented to emit signal.
131 """
132 super(QtZMQSocketChannel, self).start()
133 self.started.emit()
134
135 def run(self):
136 """The thread's main activity. Call start() instead."""
137 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
138 self.stream.on_recv(self._handle_recv)
139 self._run_loop()
106 self._is_alive = True
140 107
141 108 def stop(self):
142 """Stop the channel's event loop and join its thread.
143
144 This calls :meth:`~threading.Thread.join` and returns when the thread
145 terminates. :class:`RuntimeError` will be raised if
146 :meth:`~threading.Thread.start` is called again.
147 """
148 if self.ioloop is not None:
149 self.ioloop.stop()
150 self.join()
151 self.close()
152 self.stopped.emit()
109 self._is_alive = False
153 110
154 111 def close(self):
155 if self.ioloop is not None:
156 try:
157 self.ioloop.close(all_fds=True)
158 except Exception:
159 pass
160 112 if self.socket is not None:
161 113 try:
162 114 self.socket.close(linger=0)
@@ -238,12 +190,76 class QtZMQSocketChannel(SuperQObject, Thread):
238 190 self._flushed = True
239 191
240 192
193 class IOLoopThread(Thread):
194 """Run a pyzmq ioloop in a thread to send and receive messages
195 """
196 def __init__(self, loop):
197 super(IOLoopThread, self).__init__()
198 self.daemon = True
199 atexit.register(self._notice_exit)
200 self.ioloop = loop or ioloop.IOLoop()
201
202 def _notice_exit(self):
203 self._exiting = True
204
205 def run(self):
206 """Run my loop, ignoring EINTR events in the poller"""
207 while True:
208 try:
209 self.ioloop.start()
210 except ZMQError as e:
211 if e.errno == errno.EINTR:
212 continue
213 else:
214 raise
215 except Exception:
216 if self._exiting:
217 break
218 else:
219 raise
220 else:
221 break
222
223 def stop(self):
224 """Stop the channel's event loop and join its thread.
225
226 This calls :meth:`~threading.Thread.join` and returns when the thread
227 terminates. :class:`RuntimeError` will be raised if
228 :meth:`~threading.Thread.start` is called again.
229 """
230 if self.ioloop is not None:
231 self.ioloop.stop()
232 self.join()
233 self.close()
234
235 def close(self):
236 if self.ioloop is not None:
237 try:
238 self.ioloop.close(all_fds=True)
239 except Exception:
240 pass
241
242
241 243 class QtKernelClient(QtKernelClientMixin, KernelClient):
242 244 """ A KernelClient that provides signals and slots.
243 245 """
246
247 _ioloop = None
248 @property
249 def ioloop(self):
250 if self._ioloop is None:
251 self._ioloop = ioloop.IOLoop()
252 return self._ioloop
253
254 ioloop_thread = Instance(IOLoopThread)
255
244 256 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
245 257 if shell:
246 258 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
259
260 self.channel_listener_thread = IOLoopThread(self.ioloop)
261 self.channel_listener_thread.start()
262
247 263 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
248 264
249 265 def _check_kernel_info_reply(self, msg):
@@ -251,6 +267,11 class QtKernelClient(QtKernelClientMixin, KernelClient):
251 267 self._handle_kernel_info_reply(msg)
252 268 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
253 269
270 def stop_channels(self):
271 super(QtKernelClient, self).stop_channels()
272 if self.ioloop_thread.is_alive():
273 self.ioloop_thread.stop()
274
254 275 iopub_channel_class = Type(QtZMQSocketChannel)
255 276 shell_channel_class = Type(QtZMQSocketChannel)
256 277 stdin_channel_class = Type(QtZMQSocketChannel)
General Comments 0
You need to be logged in to leave comments. Login now