##// END OF EJS Templates
fix `zmq` module conflict
Björn Linse -
Show More
@@ -1,229 +1,230 b''
1 1 """ Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies.
2 2 """
3 from __future__ import absolute_import
3 4 import atexit
4 5 import errno
5 6 from threading import Thread
6 7 import time
7 8
8 9 import zmq
9 10 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 11 # during garbage collection of threads at exit:
11 12 from zmq import ZMQError
12 13 from zmq.eventloop import ioloop, zmqstream
13 14
14 15 # Local imports
15 16 from IPython.utils.traitlets import Type, Instance
16 17 from IPython.kernel.channels import HBChannel
17 18 from IPython.kernel import KernelClient
18 19 from IPython.kernel.channels import HBChannel
19 20
20 21 class ThreadedZMQSocketChannel(object):
21 22 """A ZMQ socket invoking a callback in the ioloop"""
22 23 session = None
23 24 socket = None
24 25 ioloop = None
25 26 stream = None
26 27 _inspect = None
27 28
28 29 def __init__(self, socket, session, loop):
29 30 """Create a channel.
30 31
31 32 Parameters
32 33 ----------
33 34 socket : :class:`zmq.Socket`
34 35 The ZMQ socket to use.
35 36 session : :class:`session.Session`
36 37 The session to use.
37 38 loop
38 39 A pyzmq ioloop to connect the socket to using a ZMQStream
39 40 """
40 41 super(ThreadedZMQSocketChannel, self).__init__()
41 42
42 43 self.socket = socket
43 44 self.session = session
44 45 self.ioloop = loop
45 46
46 47 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
47 48 self.stream.on_recv(self._handle_recv)
48 49
49 50 _is_alive = False
50 51 def is_alive(self):
51 52 return self._is_alive
52 53
53 54 def start(self):
54 55 self._is_alive = True
55 56
56 57 def stop(self):
57 58 self._is_alive = False
58 59
59 60 def close(self):
60 61 if self.socket is not None:
61 62 try:
62 63 self.socket.close(linger=0)
63 64 except Exception:
64 65 pass
65 66 self.socket = None
66 67
67 68 def send(self, msg):
68 69 """Queue a message to be sent from the IOLoop's thread.
69 70
70 71 Parameters
71 72 ----------
72 73 msg : message to send
73 74
74 75 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
75 76 thread control of the action.
76 77 """
77 78 def thread_send():
78 79 self.session.send(self.stream, msg)
79 80 self.ioloop.add_callback(thread_send)
80 81
81 82 def _handle_recv(self, msg):
82 83 """Callback for stream.on_recv.
83 84
84 85 Unpacks message, and calls handlers with it.
85 86 """
86 87 ident,smsg = self.session.feed_identities(msg)
87 88 msg = self.session.deserialize(smsg)
88 89 # let client inspect messages
89 90 if self._inspect:
90 91 self._inspect(msg)
91 92 self.call_handlers(msg)
92 93
93 94 def call_handlers(self, msg):
94 95 """This method is called in the ioloop thread when a message arrives.
95 96
96 97 Subclasses should override this method to handle incoming messages.
97 98 It is important to remember that this method is called in the thread
98 99 so that some logic must be done to ensure that the application level
99 100 handlers are called in the application thread.
100 101 """
101 102 pass
102 103
103 104 def process_events(self):
104 105 """Subclasses should override this with a method
105 106 processing any pending GUI events.
106 107 """
107 108 pass
108 109
109 110
110 111 def flush(self, timeout=1.0):
111 112 """Immediately processes all pending messages on this channel.
112 113
113 114 This is only used for the IOPub channel.
114 115
115 116 Callers should use this method to ensure that :meth:`call_handlers`
116 117 has been called for all messages that have been received on the
117 118 0MQ SUB socket of this channel.
118 119
119 120 This method is thread safe.
120 121
121 122 Parameters
122 123 ----------
123 124 timeout : float, optional
124 125 The maximum amount of time to spend flushing, in seconds. The
125 126 default is one second.
126 127 """
127 128 # We do the IOLoop callback process twice to ensure that the IOLoop
128 129 # gets to perform at least one full poll.
129 130 stop_time = time.time() + timeout
130 131 for i in range(2):
131 132 self._flushed = False
132 133 self.ioloop.add_callback(self._flush)
133 134 while not self._flushed and time.time() < stop_time:
134 135 time.sleep(0.01)
135 136
136 137 def _flush(self):
137 138 """Callback for :method:`self.flush`."""
138 139 self.stream.flush()
139 140 self._flushed = True
140 141
141 142
142 143 class IOLoopThread(Thread):
143 144 """Run a pyzmq ioloop in a thread to send and receive messages
144 145 """
145 146 def __init__(self, loop):
146 147 super(IOLoopThread, self).__init__()
147 148 self.daemon = True
148 149 atexit.register(self._notice_exit)
149 150 self.ioloop = loop or ioloop.IOLoop()
150 151
151 152 def _notice_exit(self):
152 153 self._exiting = True
153 154
154 155 def run(self):
155 156 """Run my loop, ignoring EINTR events in the poller"""
156 157 while True:
157 158 try:
158 159 self.ioloop.start()
159 160 except ZMQError as e:
160 161 if e.errno == errno.EINTR:
161 162 continue
162 163 else:
163 164 raise
164 165 except Exception:
165 166 if self._exiting:
166 167 break
167 168 else:
168 169 raise
169 170 else:
170 171 break
171 172
172 173 def stop(self):
173 174 """Stop the channel's event loop and join its thread.
174 175
175 176 This calls :meth:`~threading.Thread.join` and returns when the thread
176 177 terminates. :class:`RuntimeError` will be raised if
177 178 :meth:`~threading.Thread.start` is called again.
178 179 """
179 180 if self.ioloop is not None:
180 181 self.ioloop.stop()
181 182 self.join()
182 183 self.close()
183 184
184 185 def close(self):
185 186 if self.ioloop is not None:
186 187 try:
187 188 self.ioloop.close(all_fds=True)
188 189 except Exception:
189 190 pass
190 191
191 192
192 193 class ThreadedKernelClient(KernelClient):
193 194 """ A KernelClient that provides thread-safe sockets with async callbacks on message replies.
194 195 """
195 196
196 197 _ioloop = None
197 198 @property
198 199 def ioloop(self):
199 200 if self._ioloop is None:
200 201 self._ioloop = ioloop.IOLoop()
201 202 return self._ioloop
202 203
203 204 ioloop_thread = Instance(IOLoopThread)
204 205
205 206 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
206 207 if shell:
207 208 self.shell_channel._inspect = self._check_kernel_info_reply
208 209
209 210 self.ioloop_thread = IOLoopThread(self.ioloop)
210 211 self.ioloop_thread.start()
211 212
212 213 super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)
213 214
214 215 def _check_kernel_info_reply(self, msg):
215 216 """This is run in the ioloop thread when the kernel info reply is recieved
216 217 """
217 218 if msg['msg_type'] == 'kernel_info_reply':
218 219 self._handle_kernel_info_reply(msg)
219 220 self.shell_channel._inspect = None
220 221
221 222 def stop_channels(self):
222 223 super(ThreadedKernelClient, self).stop_channels()
223 224 if self.ioloop_thread.is_alive():
224 225 self.ioloop_thread.stop()
225 226
226 227 iopub_channel_class = Type(ThreadedZMQSocketChannel)
227 228 shell_channel_class = Type(ThreadedZMQSocketChannel)
228 229 stdin_channel_class = Type(ThreadedZMQSocketChannel)
229 230 hb_channel_class = Type(HBChannel)
General Comments 0
You need to be logged in to leave comments. Login now