##// END OF EJS Templates
fix `zmq` module conflict
Björn Linse -
Show More
@@ -1,229 +1,230
1 """ Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies.
1 """ Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies.
2 """
2 """
3 from __future__ import absolute_import
3 import atexit
4 import atexit
4 import errno
5 import errno
5 from threading import Thread
6 from threading import Thread
6 import time
7 import time
7
8
8 import zmq
9 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
11 # during garbage collection of threads at exit:
11 from zmq import ZMQError
12 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
13 from zmq.eventloop import ioloop, zmqstream
13
14
14 # Local imports
15 # Local imports
15 from IPython.utils.traitlets import Type, Instance
16 from IPython.utils.traitlets import Type, Instance
16 from IPython.kernel.channels import HBChannel
17 from IPython.kernel.channels import HBChannel
17 from IPython.kernel import KernelClient
18 from IPython.kernel import KernelClient
18 from IPython.kernel.channels import HBChannel
19 from IPython.kernel.channels import HBChannel
19
20
20 class ThreadedZMQSocketChannel(object):
21 class ThreadedZMQSocketChannel(object):
21 """A ZMQ socket invoking a callback in the ioloop"""
22 """A ZMQ socket invoking a callback in the ioloop"""
22 session = None
23 session = None
23 socket = None
24 socket = None
24 ioloop = None
25 ioloop = None
25 stream = None
26 stream = None
26 _inspect = None
27 _inspect = None
27
28
28 def __init__(self, socket, session, loop):
29 def __init__(self, socket, session, loop):
29 """Create a channel.
30 """Create a channel.
30
31
31 Parameters
32 Parameters
32 ----------
33 ----------
33 socket : :class:`zmq.Socket`
34 socket : :class:`zmq.Socket`
34 The ZMQ socket to use.
35 The ZMQ socket to use.
35 session : :class:`session.Session`
36 session : :class:`session.Session`
36 The session to use.
37 The session to use.
37 loop
38 loop
38 A pyzmq ioloop to connect the socket to using a ZMQStream
39 A pyzmq ioloop to connect the socket to using a ZMQStream
39 """
40 """
40 super(ThreadedZMQSocketChannel, self).__init__()
41 super(ThreadedZMQSocketChannel, self).__init__()
41
42
42 self.socket = socket
43 self.socket = socket
43 self.session = session
44 self.session = session
44 self.ioloop = loop
45 self.ioloop = loop
45
46
46 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
47 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
47 self.stream.on_recv(self._handle_recv)
48 self.stream.on_recv(self._handle_recv)
48
49
49 _is_alive = False
50 _is_alive = False
50 def is_alive(self):
51 def is_alive(self):
51 return self._is_alive
52 return self._is_alive
52
53
53 def start(self):
54 def start(self):
54 self._is_alive = True
55 self._is_alive = True
55
56
56 def stop(self):
57 def stop(self):
57 self._is_alive = False
58 self._is_alive = False
58
59
59 def close(self):
60 def close(self):
60 if self.socket is not None:
61 if self.socket is not None:
61 try:
62 try:
62 self.socket.close(linger=0)
63 self.socket.close(linger=0)
63 except Exception:
64 except Exception:
64 pass
65 pass
65 self.socket = None
66 self.socket = None
66
67
67 def send(self, msg):
68 def send(self, msg):
68 """Queue a message to be sent from the IOLoop's thread.
69 """Queue a message to be sent from the IOLoop's thread.
69
70
70 Parameters
71 Parameters
71 ----------
72 ----------
72 msg : message to send
73 msg : message to send
73
74
74 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
75 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
75 thread control of the action.
76 thread control of the action.
76 """
77 """
77 def thread_send():
78 def thread_send():
78 self.session.send(self.stream, msg)
79 self.session.send(self.stream, msg)
79 self.ioloop.add_callback(thread_send)
80 self.ioloop.add_callback(thread_send)
80
81
81 def _handle_recv(self, msg):
82 def _handle_recv(self, msg):
82 """Callback for stream.on_recv.
83 """Callback for stream.on_recv.
83
84
84 Unpacks message, and calls handlers with it.
85 Unpacks message, and calls handlers with it.
85 """
86 """
86 ident,smsg = self.session.feed_identities(msg)
87 ident,smsg = self.session.feed_identities(msg)
87 msg = self.session.deserialize(smsg)
88 msg = self.session.deserialize(smsg)
88 # let client inspect messages
89 # let client inspect messages
89 if self._inspect:
90 if self._inspect:
90 self._inspect(msg)
91 self._inspect(msg)
91 self.call_handlers(msg)
92 self.call_handlers(msg)
92
93
93 def call_handlers(self, msg):
94 def call_handlers(self, msg):
94 """This method is called in the ioloop thread when a message arrives.
95 """This method is called in the ioloop thread when a message arrives.
95
96
96 Subclasses should override this method to handle incoming messages.
97 Subclasses should override this method to handle incoming messages.
97 It is important to remember that this method is called in the thread
98 It is important to remember that this method is called in the thread
98 so that some logic must be done to ensure that the application level
99 so that some logic must be done to ensure that the application level
99 handlers are called in the application thread.
100 handlers are called in the application thread.
100 """
101 """
101 pass
102 pass
102
103
103 def process_events(self):
104 def process_events(self):
104 """Subclasses should override this with a method
105 """Subclasses should override this with a method
105 processing any pending GUI events.
106 processing any pending GUI events.
106 """
107 """
107 pass
108 pass
108
109
109
110
110 def flush(self, timeout=1.0):
111 def flush(self, timeout=1.0):
111 """Immediately processes all pending messages on this channel.
112 """Immediately processes all pending messages on this channel.
112
113
113 This is only used for the IOPub channel.
114 This is only used for the IOPub channel.
114
115
115 Callers should use this method to ensure that :meth:`call_handlers`
116 Callers should use this method to ensure that :meth:`call_handlers`
116 has been called for all messages that have been received on the
117 has been called for all messages that have been received on the
117 0MQ SUB socket of this channel.
118 0MQ SUB socket of this channel.
118
119
119 This method is thread safe.
120 This method is thread safe.
120
121
121 Parameters
122 Parameters
122 ----------
123 ----------
123 timeout : float, optional
124 timeout : float, optional
124 The maximum amount of time to spend flushing, in seconds. The
125 The maximum amount of time to spend flushing, in seconds. The
125 default is one second.
126 default is one second.
126 """
127 """
127 # We do the IOLoop callback process twice to ensure that the IOLoop
128 # We do the IOLoop callback process twice to ensure that the IOLoop
128 # gets to perform at least one full poll.
129 # gets to perform at least one full poll.
129 stop_time = time.time() + timeout
130 stop_time = time.time() + timeout
130 for i in range(2):
131 for i in range(2):
131 self._flushed = False
132 self._flushed = False
132 self.ioloop.add_callback(self._flush)
133 self.ioloop.add_callback(self._flush)
133 while not self._flushed and time.time() < stop_time:
134 while not self._flushed and time.time() < stop_time:
134 time.sleep(0.01)
135 time.sleep(0.01)
135
136
136 def _flush(self):
137 def _flush(self):
137 """Callback for :method:`self.flush`."""
138 """Callback for :method:`self.flush`."""
138 self.stream.flush()
139 self.stream.flush()
139 self._flushed = True
140 self._flushed = True
140
141
141
142
142 class IOLoopThread(Thread):
143 class IOLoopThread(Thread):
143 """Run a pyzmq ioloop in a thread to send and receive messages
144 """Run a pyzmq ioloop in a thread to send and receive messages
144 """
145 """
145 def __init__(self, loop):
146 def __init__(self, loop):
146 super(IOLoopThread, self).__init__()
147 super(IOLoopThread, self).__init__()
147 self.daemon = True
148 self.daemon = True
148 atexit.register(self._notice_exit)
149 atexit.register(self._notice_exit)
149 self.ioloop = loop or ioloop.IOLoop()
150 self.ioloop = loop or ioloop.IOLoop()
150
151
151 def _notice_exit(self):
152 def _notice_exit(self):
152 self._exiting = True
153 self._exiting = True
153
154
154 def run(self):
155 def run(self):
155 """Run my loop, ignoring EINTR events in the poller"""
156 """Run my loop, ignoring EINTR events in the poller"""
156 while True:
157 while True:
157 try:
158 try:
158 self.ioloop.start()
159 self.ioloop.start()
159 except ZMQError as e:
160 except ZMQError as e:
160 if e.errno == errno.EINTR:
161 if e.errno == errno.EINTR:
161 continue
162 continue
162 else:
163 else:
163 raise
164 raise
164 except Exception:
165 except Exception:
165 if self._exiting:
166 if self._exiting:
166 break
167 break
167 else:
168 else:
168 raise
169 raise
169 else:
170 else:
170 break
171 break
171
172
172 def stop(self):
173 def stop(self):
173 """Stop the channel's event loop and join its thread.
174 """Stop the channel's event loop and join its thread.
174
175
175 This calls :meth:`~threading.Thread.join` and returns when the thread
176 This calls :meth:`~threading.Thread.join` and returns when the thread
176 terminates. :class:`RuntimeError` will be raised if
177 terminates. :class:`RuntimeError` will be raised if
177 :meth:`~threading.Thread.start` is called again.
178 :meth:`~threading.Thread.start` is called again.
178 """
179 """
179 if self.ioloop is not None:
180 if self.ioloop is not None:
180 self.ioloop.stop()
181 self.ioloop.stop()
181 self.join()
182 self.join()
182 self.close()
183 self.close()
183
184
184 def close(self):
185 def close(self):
185 if self.ioloop is not None:
186 if self.ioloop is not None:
186 try:
187 try:
187 self.ioloop.close(all_fds=True)
188 self.ioloop.close(all_fds=True)
188 except Exception:
189 except Exception:
189 pass
190 pass
190
191
191
192
192 class ThreadedKernelClient(KernelClient):
193 class ThreadedKernelClient(KernelClient):
193 """ A KernelClient that provides thread-safe sockets with async callbacks on message replies.
194 """ A KernelClient that provides thread-safe sockets with async callbacks on message replies.
194 """
195 """
195
196
196 _ioloop = None
197 _ioloop = None
197 @property
198 @property
198 def ioloop(self):
199 def ioloop(self):
199 if self._ioloop is None:
200 if self._ioloop is None:
200 self._ioloop = ioloop.IOLoop()
201 self._ioloop = ioloop.IOLoop()
201 return self._ioloop
202 return self._ioloop
202
203
203 ioloop_thread = Instance(IOLoopThread)
204 ioloop_thread = Instance(IOLoopThread)
204
205
205 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
206 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
206 if shell:
207 if shell:
207 self.shell_channel._inspect = self._check_kernel_info_reply
208 self.shell_channel._inspect = self._check_kernel_info_reply
208
209
209 self.ioloop_thread = IOLoopThread(self.ioloop)
210 self.ioloop_thread = IOLoopThread(self.ioloop)
210 self.ioloop_thread.start()
211 self.ioloop_thread.start()
211
212
212 super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)
213 super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)
213
214
214 def _check_kernel_info_reply(self, msg):
215 def _check_kernel_info_reply(self, msg):
215 """This is run in the ioloop thread when the kernel info reply is recieved
216 """This is run in the ioloop thread when the kernel info reply is recieved
216 """
217 """
217 if msg['msg_type'] == 'kernel_info_reply':
218 if msg['msg_type'] == 'kernel_info_reply':
218 self._handle_kernel_info_reply(msg)
219 self._handle_kernel_info_reply(msg)
219 self.shell_channel._inspect = None
220 self.shell_channel._inspect = None
220
221
221 def stop_channels(self):
222 def stop_channels(self):
222 super(ThreadedKernelClient, self).stop_channels()
223 super(ThreadedKernelClient, self).stop_channels()
223 if self.ioloop_thread.is_alive():
224 if self.ioloop_thread.is_alive():
224 self.ioloop_thread.stop()
225 self.ioloop_thread.stop()
225
226
226 iopub_channel_class = Type(ThreadedZMQSocketChannel)
227 iopub_channel_class = Type(ThreadedZMQSocketChannel)
227 shell_channel_class = Type(ThreadedZMQSocketChannel)
228 shell_channel_class = Type(ThreadedZMQSocketChannel)
228 stdin_channel_class = Type(ThreadedZMQSocketChannel)
229 stdin_channel_class = Type(ThreadedZMQSocketChannel)
229 hb_channel_class = Type(HBChannel)
230 hb_channel_class = Type(HBChannel)
General Comments 0
You need to be logged in to leave comments. Login now