##// END OF EJS Templates
Share an IOLoop among Qt channels, rather than one each
Thomas Kluyver -
Show More
@@ -51,17 +51,17 b' class ZMQSocketChannel(object):'
51 _exiting = False
51 _exiting = False
52 proxy_methods = []
52 proxy_methods = []
53
53
54 def __init__(self, socket, session):
54 def __init__(self, socket, session, loop=None):
55 """Create a channel.
55 """Create a channel.
56
56
57 Parameters
57 Parameters
58 ----------
58 ----------
59 context : :class:`zmq.Context`
59 socket : :class:`zmq.Socket`
60 The ZMQ context to use.
60 The ZMQ socket to use.
61 session : :class:`session.Session`
61 session : :class:`session.Session`
62 The session to use.
62 The session to use.
63 address : zmq url
63 loop
64 Standard (ip, port) tuple that the kernel is listening on.
64 Unused here, for other implementations
65 """
65 """
66 super(ZMQSocketChannel, self).__init__()
66 super(ZMQSocketChannel, self).__init__()
67
67
@@ -124,6 +124,8 b' class KernelClient(ConnectionFileMixin):'
124 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
124 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
125 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
125 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
126
126
127 ioloop = None # Overridden in subclasses that use pyzmq event loop
128
127 @property
129 @property
128 def shell_channel(self):
130 def shell_channel(self):
129 """Get the shell channel object for this kernel."""
131 """Get the shell channel object for this kernel."""
@@ -132,7 +134,7 b' class KernelClient(ConnectionFileMixin):'
132 self.log.debug("connecting shell channel to %s", url)
134 self.log.debug("connecting shell channel to %s", url)
133 socket = make_shell_socket(self.context, self.session.bsession, url)
135 socket = make_shell_socket(self.context, self.session.bsession, url)
134 self._shell_channel = self.shell_channel_class(
136 self._shell_channel = self.shell_channel_class(
135 socket, self.session
137 socket, self.session, self.ioloop
136 )
138 )
137 return self._shell_channel
139 return self._shell_channel
138
140
@@ -144,7 +146,7 b' class KernelClient(ConnectionFileMixin):'
144 self.log.debug("connecting iopub channel to %s", url)
146 self.log.debug("connecting iopub channel to %s", url)
145 socket = make_iopub_socket(self.context, self.session.bsession, url)
147 socket = make_iopub_socket(self.context, self.session.bsession, url)
146 self._iopub_channel = self.iopub_channel_class(
148 self._iopub_channel = self.iopub_channel_class(
147 socket, self.session
149 socket, self.session, self.ioloop
148 )
150 )
149 return self._iopub_channel
151 return self._iopub_channel
150
152
@@ -156,7 +158,7 b' class KernelClient(ConnectionFileMixin):'
156 self.log.debug("connecting stdin channel to %s", url)
158 self.log.debug("connecting stdin channel to %s", url)
157 socket = make_stdin_socket(self.context, self.session.bsession, url)
159 socket = make_stdin_socket(self.context, self.session.bsession, url)
158 self._stdin_channel = self.stdin_channel_class(
160 self._stdin_channel = self.stdin_channel_class(
159 socket, self.session
161 socket, self.session, self.ioloop
160 )
162 )
161 return self._stdin_channel
163 return self._stdin_channel
162
164
@@ -14,7 +14,7 b' from zmq.eventloop import ioloop, zmqstream'
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
@@ -53,20 +53,12 b' def validate_string_dict(dct):'
53
53
54
54
55
55
56 class QtZMQSocketChannel(SuperQObject, Thread):
56 class QtZMQSocketChannel(SuperQObject):
57 """The base class for the channels that use ZMQ sockets."""
57 """The base class for the channels that use ZMQ sockets."""
58 session = None
58 session = None
59 socket = None
59 socket = None
60 ioloop = None
60 ioloop = None
61 stream = None
61 stream = None
62 _exiting = False
63 proxy_methods = []
64
65 # Emitted when the channel is started.
66 started = QtCore.Signal()
67
68 # Emitted when the channel is stopped.
69 stopped = QtCore.Signal()
70
62
71 message_received = QtCore.Signal(object)
63 message_received = QtCore.Signal(object)
72
64
@@ -85,78 +77,38 b' class QtZMQSocketChannel(SuperQObject, Thread):'
85 """
77 """
86 QtCore.QCoreApplication.instance().processEvents()
78 QtCore.QCoreApplication.instance().processEvents()
87
79
88 def __init__(self, socket, session):
80 def __init__(self, socket, session, loop):
89 """Create a channel.
81 """Create a channel.
90
82
91 Parameters
83 Parameters
92 ----------
84 ----------
93 context : :class:`zmq.Context`
85 socket : :class:`zmq.Socket`
94 The ZMQ context to use.
86 The ZMQ socket to use.
95 session : :class:`session.Session`
87 session : :class:`session.Session`
96 The session to use.
88 The session to use.
97 address : zmq url
89 loop
98 Standard (ip, port) tuple that the kernel is listening on.
90 A pyzmq ioloop to connect the socket to using a ZMQStream
99 """
91 """
100 super(QtZMQSocketChannel, self).__init__()
92 super(QtZMQSocketChannel, self).__init__()
101 self.daemon = True
102
93
103 self.socket = socket
94 self.socket = socket
104 self.session = session
95 self.session = session
105 atexit.register(self._notice_exit)
96 self.ioloop = loop
106 self.ioloop = ioloop.IOLoop()
107
97
108 def _notice_exit(self):
98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
109 self._exiting = True
99 self.stream.on_recv(self._handle_recv)
110
100
111 def _run_loop(self):
101 _is_alive = False
112 """Run my loop, ignoring EINTR events in the poller"""
102 def is_alive(self):
113 while True:
103 return self._is_alive
114 try:
115 self.ioloop.start()
116 except ZMQError as e:
117 if e.errno == errno.EINTR:
118 continue
119 else:
120 raise
121 except Exception:
122 if self._exiting:
123 break
124 else:
125 raise
126 else:
127 break
128
104
129 def start(self):
105 def start(self):
130 """ Reimplemented to emit signal.
106 self._is_alive = True
131 """
132 super(QtZMQSocketChannel, self).start()
133 self.started.emit()
134
135 def run(self):
136 """The thread's main activity. Call start() instead."""
137 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
138 self.stream.on_recv(self._handle_recv)
139 self._run_loop()
140
107
141 def stop(self):
108 def stop(self):
142 """Stop the channel's event loop and join its thread.
109 self._is_alive = False
143
144 This calls :meth:`~threading.Thread.join` and returns when the thread
145 terminates. :class:`RuntimeError` will be raised if
146 :meth:`~threading.Thread.start` is called again.
147 """
148 if self.ioloop is not None:
149 self.ioloop.stop()
150 self.join()
151 self.close()
152 self.stopped.emit()
153
110
154 def close(self):
111 def close(self):
155 if self.ioloop is not None:
156 try:
157 self.ioloop.close(all_fds=True)
158 except Exception:
159 pass
160 if self.socket is not None:
112 if self.socket is not None:
161 try:
113 try:
162 self.socket.close(linger=0)
114 self.socket.close(linger=0)
@@ -238,12 +190,76 b' class QtZMQSocketChannel(SuperQObject, Thread):'
238 self._flushed = True
190 self._flushed = True
239
191
240
192
193 class IOLoopThread(Thread):
194 """Run a pyzmq ioloop in a thread to send and receive messages
195 """
196 def __init__(self, loop):
197 super(IOLoopThread, self).__init__()
198 self.daemon = True
199 atexit.register(self._notice_exit)
200 self.ioloop = loop or ioloop.IOLoop()
201
202 def _notice_exit(self):
203 self._exiting = True
204
205 def run(self):
206 """Run my loop, ignoring EINTR events in the poller"""
207 while True:
208 try:
209 self.ioloop.start()
210 except ZMQError as e:
211 if e.errno == errno.EINTR:
212 continue
213 else:
214 raise
215 except Exception:
216 if self._exiting:
217 break
218 else:
219 raise
220 else:
221 break
222
223 def stop(self):
224 """Stop the channel's event loop and join its thread.
225
226 This calls :meth:`~threading.Thread.join` and returns when the thread
227 terminates. :class:`RuntimeError` will be raised if
228 :meth:`~threading.Thread.start` is called again.
229 """
230 if self.ioloop is not None:
231 self.ioloop.stop()
232 self.join()
233 self.close()
234
235 def close(self):
236 if self.ioloop is not None:
237 try:
238 self.ioloop.close(all_fds=True)
239 except Exception:
240 pass
241
242
241 class QtKernelClient(QtKernelClientMixin, KernelClient):
243 class QtKernelClient(QtKernelClientMixin, KernelClient):
242 """ A KernelClient that provides signals and slots.
244 """ A KernelClient that provides signals and slots.
243 """
245 """
246
247 _ioloop = None
248 @property
249 def ioloop(self):
250 if self._ioloop is None:
251 self._ioloop = ioloop.IOLoop()
252 return self._ioloop
253
254 ioloop_thread = Instance(IOLoopThread)
255
244 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
256 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
245 if shell:
257 if shell:
246 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
258 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
259
260 self.channel_listener_thread = IOLoopThread(self.ioloop)
261 self.channel_listener_thread.start()
262
247 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
263 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
248
264
249 def _check_kernel_info_reply(self, msg):
265 def _check_kernel_info_reply(self, msg):
@@ -251,6 +267,11 b' class QtKernelClient(QtKernelClientMixin, KernelClient):'
251 self._handle_kernel_info_reply(msg)
267 self._handle_kernel_info_reply(msg)
252 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
268 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
253
269
270 def stop_channels(self):
271 super(QtKernelClient, self).stop_channels()
272 if self.ioloop_thread.is_alive():
273 self.ioloop_thread.stop()
274
254 iopub_channel_class = Type(QtZMQSocketChannel)
275 iopub_channel_class = Type(QtZMQSocketChannel)
255 shell_channel_class = Type(QtZMQSocketChannel)
276 shell_channel_class = Type(QtZMQSocketChannel)
256 stdin_channel_class = Type(QtZMQSocketChannel)
277 stdin_channel_class = Type(QtZMQSocketChannel)
General Comments 0
You need to be logged in to leave comments. Login now