##// END OF EJS Templates
KernelClient: factor out generic threaded client from Qt client
Björn Linse -
Show More
@@ -0,0 +1,229 b''
1 """ Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies.
2 """
3 import atexit
4 import errno
5 from threading import Thread
6 import time
7
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
13
14 # Local imports
15 from IPython.utils.traitlets import Type, Instance
16 from IPython.kernel.channels import HBChannel
17 from IPython.kernel import KernelClient
18 from IPython.kernel.channels import HBChannel
19
20 class ThreadedZMQSocketChannel(object):
21 """A ZMQ socket invoking a callback in the ioloop"""
22 session = None
23 socket = None
24 ioloop = None
25 stream = None
26 _inspect = None
27
28 def __init__(self, socket, session, loop):
29 """Create a channel.
30
31 Parameters
32 ----------
33 socket : :class:`zmq.Socket`
34 The ZMQ socket to use.
35 session : :class:`session.Session`
36 The session to use.
37 loop
38 A pyzmq ioloop to connect the socket to using a ZMQStream
39 """
40 super(ThreadedZMQSocketChannel, self).__init__()
41
42 self.socket = socket
43 self.session = session
44 self.ioloop = loop
45
46 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
47 self.stream.on_recv(self._handle_recv)
48
49 _is_alive = False
50 def is_alive(self):
51 return self._is_alive
52
53 def start(self):
54 self._is_alive = True
55
56 def stop(self):
57 self._is_alive = False
58
59 def close(self):
60 if self.socket is not None:
61 try:
62 self.socket.close(linger=0)
63 except Exception:
64 pass
65 self.socket = None
66
67 def send(self, msg):
68 """Queue a message to be sent from the IOLoop's thread.
69
70 Parameters
71 ----------
72 msg : message to send
73
74 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
75 thread control of the action.
76 """
77 def thread_send():
78 self.session.send(self.stream, msg)
79 self.ioloop.add_callback(thread_send)
80
81 def _handle_recv(self, msg):
82 """Callback for stream.on_recv.
83
84 Unpacks message, and calls handlers with it.
85 """
86 ident,smsg = self.session.feed_identities(msg)
87 msg = self.session.deserialize(smsg)
88 # let client inspect messages
89 if self._inspect:
90 self._inspect(msg)
91 self.call_handlers(msg)
92
93 def call_handlers(self, msg):
94 """This method is called in the ioloop thread when a message arrives.
95
96 Subclasses should override this method to handle incoming messages.
97 It is important to remember that this method is called in the thread
98 so that some logic must be done to ensure that the application level
99 handlers are called in the application thread.
100 """
101 pass
102
103 def process_events(self):
104 """Subclasses should override this with a method
105 processing any pending GUI events.
106 """
107 pass
108
109
110 def flush(self, timeout=1.0):
111 """Immediately processes all pending messages on this channel.
112
113 This is only used for the IOPub channel.
114
115 Callers should use this method to ensure that :meth:`call_handlers`
116 has been called for all messages that have been received on the
117 0MQ SUB socket of this channel.
118
119 This method is thread safe.
120
121 Parameters
122 ----------
123 timeout : float, optional
124 The maximum amount of time to spend flushing, in seconds. The
125 default is one second.
126 """
127 # We do the IOLoop callback process twice to ensure that the IOLoop
128 # gets to perform at least one full poll.
129 stop_time = time.time() + timeout
130 for i in range(2):
131 self._flushed = False
132 self.ioloop.add_callback(self._flush)
133 while not self._flushed and time.time() < stop_time:
134 time.sleep(0.01)
135
136 def _flush(self):
137 """Callback for :method:`self.flush`."""
138 self.stream.flush()
139 self._flushed = True
140
141
142 class IOLoopThread(Thread):
143 """Run a pyzmq ioloop in a thread to send and receive messages
144 """
145 def __init__(self, loop):
146 super(IOLoopThread, self).__init__()
147 self.daemon = True
148 atexit.register(self._notice_exit)
149 self.ioloop = loop or ioloop.IOLoop()
150
151 def _notice_exit(self):
152 self._exiting = True
153
154 def run(self):
155 """Run my loop, ignoring EINTR events in the poller"""
156 while True:
157 try:
158 self.ioloop.start()
159 except ZMQError as e:
160 if e.errno == errno.EINTR:
161 continue
162 else:
163 raise
164 except Exception:
165 if self._exiting:
166 break
167 else:
168 raise
169 else:
170 break
171
172 def stop(self):
173 """Stop the channel's event loop and join its thread.
174
175 This calls :meth:`~threading.Thread.join` and returns when the thread
176 terminates. :class:`RuntimeError` will be raised if
177 :meth:`~threading.Thread.start` is called again.
178 """
179 if self.ioloop is not None:
180 self.ioloop.stop()
181 self.join()
182 self.close()
183
184 def close(self):
185 if self.ioloop is not None:
186 try:
187 self.ioloop.close(all_fds=True)
188 except Exception:
189 pass
190
191
192 class ThreadedKernelClient(KernelClient):
193 """ A KernelClient that provides thread-safe sockets with async callbacks on message replies.
194 """
195
196 _ioloop = None
197 @property
198 def ioloop(self):
199 if self._ioloop is None:
200 self._ioloop = ioloop.IOLoop()
201 return self._ioloop
202
203 ioloop_thread = Instance(IOLoopThread)
204
205 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
206 if shell:
207 self.shell_channel._inspect = self._check_kernel_info_reply
208
209 self.ioloop_thread = IOLoopThread(self.ioloop)
210 self.ioloop_thread.start()
211
212 super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)
213
214 def _check_kernel_info_reply(self, msg):
215 """This is run in the ioloop thread when the kernel info reply is recieved
216 """
217 if msg['msg_type'] == 'kernel_info_reply':
218 self._handle_kernel_info_reply(msg)
219 self.shell_channel._inspect = None
220
221 def stop_channels(self):
222 super(ThreadedKernelClient, self).stop_channels()
223 if self.ioloop_thread.is_alive():
224 self.ioloop_thread.stop()
225
226 iopub_channel_class = Type(ThreadedZMQSocketChannel)
227 shell_channel_class = Type(ThreadedZMQSocketChannel)
228 stdin_channel_class = Type(ThreadedZMQSocketChannel)
229 hb_channel_class = Type(HBChannel)
@@ -348,7 +348,8 b' class KernelClient(ConnectionFileMixin):'
348 348 def _handle_kernel_info_reply(self, msg):
349 349 """handle kernel info reply
350 350
351 sets protocol adaptation version
351 sets protocol adaptation version. This might
352 be run from a separate thread.
352 353 """
353 354 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
354 355 if adapt_version != major_protocol_version:
@@ -17,6 +17,8 b' from IPython.external.qt import QtCore'
17 17 from IPython.utils.traitlets import Type, Instance
18 18 from IPython.kernel.channels import HBChannel
19 19 from IPython.kernel import KernelClient
20 from IPython.kernel.channels import InvalidPortNumber
21 from IPython.kernel.threaded import ThreadedKernelClient, ThreadedZMQSocketChannel
20 22
21 23 from .kernel_mixins import QtKernelClientMixin
22 24 from .util import SuperQObject
@@ -38,17 +40,8 b' from IPython.core.release import kernel_protocol_version_info'
38 40
39 41 major_protocol_version = kernel_protocol_version_info[0]
40 42
41 class InvalidPortNumber(Exception):
42 pass
43
44
45 class QtZMQSocketChannel(SuperQObject):
43 class QtZMQSocketChannel(ThreadedZMQSocketChannel,SuperQObject):
46 44 """A ZMQ socket emitting a Qt signal when a message is received."""
47 session = None
48 socket = None
49 ioloop = None
50 stream = None
51
52 45 message_received = QtCore.Signal(object)
53 46
54 47 def process_events(self):
@@ -56,72 +49,10 b' class QtZMQSocketChannel(SuperQObject):'
56 49 """
57 50 QtCore.QCoreApplication.instance().processEvents()
58 51
59 def __init__(self, socket, session, loop):
60 """Create a channel.
61
62 Parameters
63 ----------
64 socket : :class:`zmq.Socket`
65 The ZMQ socket to use.
66 session : :class:`session.Session`
67 The session to use.
68 loop
69 A pyzmq ioloop to connect the socket to using a ZMQStream
70 """
71 super(QtZMQSocketChannel, self).__init__()
72
73 self.socket = socket
74 self.session = session
75 self.ioloop = loop
76
77 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
78 self.stream.on_recv(self._handle_recv)
79
80 _is_alive = False
81 def is_alive(self):
82 return self._is_alive
83
84 def start(self):
85 self._is_alive = True
86
87 def stop(self):
88 self._is_alive = False
89
90 def close(self):
91 if self.socket is not None:
92 try:
93 self.socket.close(linger=0)
94 except Exception:
95 pass
96 self.socket = None
97
98 def send(self, msg):
99 """Queue a message to be sent from the IOLoop's thread.
100
101 Parameters
102 ----------
103 msg : message to send
104
105 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
106 thread control of the action.
107 """
108 def thread_send():
109 self.session.send(self.stream, msg)
110 self.ioloop.add_callback(thread_send)
111
112 def _handle_recv(self, msg):
113 """Callback for stream.on_recv.
114
115 Unpacks message, and calls handlers with it.
116 """
117 ident,smsg = self.session.feed_identities(msg)
118 msg = self.session.deserialize(smsg)
119 self.call_handlers(msg)
120 52
121 53 def call_handlers(self, msg):
122 54 """This method is called in the ioloop thread when a message arrives.
123 55
124 Subclasses should override this method to handle incoming messages.
125 56 It is important to remember that this method is called in the thread
126 57 so that some logic must be done to ensure that the application level
127 58 handlers are called in the application thread.
@@ -129,120 +60,11 b' class QtZMQSocketChannel(SuperQObject):'
129 60 # Emit the generic signal.
130 61 self.message_received.emit(msg)
131 62
132 def flush(self, timeout=1.0):
133 """Immediately processes all pending messages on this channel.
134 63
135 This is only used for the IOPub channel.
136
137 Callers should use this method to ensure that :meth:`call_handlers`
138 has been called for all messages that have been received on the
139 0MQ SUB socket of this channel.
140
141 This method is thread safe.
142
143 Parameters
144 ----------
145 timeout : float, optional
146 The maximum amount of time to spend flushing, in seconds. The
147 default is one second.
148 """
149 # We do the IOLoop callback process twice to ensure that the IOLoop
150 # gets to perform at least one full poll.
151 stop_time = time.time() + timeout
152 for i in range(2):
153 self._flushed = False
154 self.ioloop.add_callback(self._flush)
155 while not self._flushed and time.time() < stop_time:
156 time.sleep(0.01)
157
158 def _flush(self):
159 """Callback for :method:`self.flush`."""
160 self.stream.flush()
161 self._flushed = True
162
163
164 class IOLoopThread(Thread):
165 """Run a pyzmq ioloop in a thread to send and receive messages
166 """
167 def __init__(self, loop):
168 super(IOLoopThread, self).__init__()
169 self.daemon = True
170 atexit.register(self._notice_exit)
171 self.ioloop = loop or ioloop.IOLoop()
172
173 def _notice_exit(self):
174 self._exiting = True
175
176 def run(self):
177 """Run my loop, ignoring EINTR events in the poller"""
178 while True:
179 try:
180 self.ioloop.start()
181 except ZMQError as e:
182 if e.errno == errno.EINTR:
183 continue
184 else:
185 raise
186 except Exception:
187 if self._exiting:
188 break
189 else:
190 raise
191 else:
192 break
193
194 def stop(self):
195 """Stop the channel's event loop and join its thread.
196
197 This calls :meth:`~threading.Thread.join` and returns when the thread
198 terminates. :class:`RuntimeError` will be raised if
199 :meth:`~threading.Thread.start` is called again.
200 """
201 if self.ioloop is not None:
202 self.ioloop.stop()
203 self.join()
204 self.close()
205
206 def close(self):
207 if self.ioloop is not None:
208 try:
209 self.ioloop.close(all_fds=True)
210 except Exception:
211 pass
212
213
214 class QtKernelClient(QtKernelClientMixin, KernelClient):
64 class QtKernelClient(QtKernelClientMixin, ThreadedKernelClient):
215 65 """ A KernelClient that provides signals and slots.
216 66 """
217 67
218 _ioloop = None
219 @property
220 def ioloop(self):
221 if self._ioloop is None:
222 self._ioloop = ioloop.IOLoop()
223 return self._ioloop
224
225 ioloop_thread = Instance(IOLoopThread)
226
227 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
228 if shell:
229 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
230
231 self.ioloop_thread = IOLoopThread(self.ioloop)
232 self.ioloop_thread.start()
233
234 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
235
236 def _check_kernel_info_reply(self, msg):
237 if msg['msg_type'] == 'kernel_info_reply':
238 self._handle_kernel_info_reply(msg)
239 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
240
241 def stop_channels(self):
242 super(QtKernelClient, self).stop_channels()
243 if self.ioloop_thread.is_alive():
244 self.ioloop_thread.stop()
245
246 68 iopub_channel_class = Type(QtZMQSocketChannel)
247 69 shell_channel_class = Type(QtZMQSocketChannel)
248 70 stdin_channel_class = Type(QtZMQSocketChannel)
General Comments 0
You need to be logged in to leave comments. Login now