Show More
@@ -1,229 +1,230 | |||||
1 | """ Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies. |
|
1 | """ Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies. | |
2 | """ |
|
2 | """ | |
|
3 | from __future__ import absolute_import | |||
3 | import atexit |
|
4 | import atexit | |
4 | import errno |
|
5 | import errno | |
5 | from threading import Thread |
|
6 | from threading import Thread | |
6 | import time |
|
7 | import time | |
7 |
|
8 | |||
8 | import zmq |
|
9 | import zmq | |
9 | # import ZMQError in top-level namespace, to avoid ugly attribute-error messages |
|
10 | # import ZMQError in top-level namespace, to avoid ugly attribute-error messages | |
10 | # during garbage collection of threads at exit: |
|
11 | # during garbage collection of threads at exit: | |
11 | from zmq import ZMQError |
|
12 | from zmq import ZMQError | |
12 | from zmq.eventloop import ioloop, zmqstream |
|
13 | from zmq.eventloop import ioloop, zmqstream | |
13 |
|
14 | |||
14 | # Local imports |
|
15 | # Local imports | |
15 | from IPython.utils.traitlets import Type, Instance |
|
16 | from IPython.utils.traitlets import Type, Instance | |
16 | from IPython.kernel.channels import HBChannel |
|
17 | from IPython.kernel.channels import HBChannel | |
17 | from IPython.kernel import KernelClient |
|
18 | from IPython.kernel import KernelClient | |
18 | from IPython.kernel.channels import HBChannel |
|
19 | from IPython.kernel.channels import HBChannel | |
19 |
|
20 | |||
20 | class ThreadedZMQSocketChannel(object): |
|
21 | class ThreadedZMQSocketChannel(object): | |
21 | """A ZMQ socket invoking a callback in the ioloop""" |
|
22 | """A ZMQ socket invoking a callback in the ioloop""" | |
22 | session = None |
|
23 | session = None | |
23 | socket = None |
|
24 | socket = None | |
24 | ioloop = None |
|
25 | ioloop = None | |
25 | stream = None |
|
26 | stream = None | |
26 | _inspect = None |
|
27 | _inspect = None | |
27 |
|
28 | |||
28 | def __init__(self, socket, session, loop): |
|
29 | def __init__(self, socket, session, loop): | |
29 | """Create a channel. |
|
30 | """Create a channel. | |
30 |
|
31 | |||
31 | Parameters |
|
32 | Parameters | |
32 | ---------- |
|
33 | ---------- | |
33 | socket : :class:`zmq.Socket` |
|
34 | socket : :class:`zmq.Socket` | |
34 | The ZMQ socket to use. |
|
35 | The ZMQ socket to use. | |
35 | session : :class:`session.Session` |
|
36 | session : :class:`session.Session` | |
36 | The session to use. |
|
37 | The session to use. | |
37 | loop |
|
38 | loop | |
38 | A pyzmq ioloop to connect the socket to using a ZMQStream |
|
39 | A pyzmq ioloop to connect the socket to using a ZMQStream | |
39 | """ |
|
40 | """ | |
40 | super(ThreadedZMQSocketChannel, self).__init__() |
|
41 | super(ThreadedZMQSocketChannel, self).__init__() | |
41 |
|
42 | |||
42 | self.socket = socket |
|
43 | self.socket = socket | |
43 | self.session = session |
|
44 | self.session = session | |
44 | self.ioloop = loop |
|
45 | self.ioloop = loop | |
45 |
|
46 | |||
46 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) |
|
47 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) | |
47 | self.stream.on_recv(self._handle_recv) |
|
48 | self.stream.on_recv(self._handle_recv) | |
48 |
|
49 | |||
49 | _is_alive = False |
|
50 | _is_alive = False | |
50 | def is_alive(self): |
|
51 | def is_alive(self): | |
51 | return self._is_alive |
|
52 | return self._is_alive | |
52 |
|
53 | |||
53 | def start(self): |
|
54 | def start(self): | |
54 | self._is_alive = True |
|
55 | self._is_alive = True | |
55 |
|
56 | |||
56 | def stop(self): |
|
57 | def stop(self): | |
57 | self._is_alive = False |
|
58 | self._is_alive = False | |
58 |
|
59 | |||
59 | def close(self): |
|
60 | def close(self): | |
60 | if self.socket is not None: |
|
61 | if self.socket is not None: | |
61 | try: |
|
62 | try: | |
62 | self.socket.close(linger=0) |
|
63 | self.socket.close(linger=0) | |
63 | except Exception: |
|
64 | except Exception: | |
64 | pass |
|
65 | pass | |
65 | self.socket = None |
|
66 | self.socket = None | |
66 |
|
67 | |||
67 | def send(self, msg): |
|
68 | def send(self, msg): | |
68 | """Queue a message to be sent from the IOLoop's thread. |
|
69 | """Queue a message to be sent from the IOLoop's thread. | |
69 |
|
70 | |||
70 | Parameters |
|
71 | Parameters | |
71 | ---------- |
|
72 | ---------- | |
72 | msg : message to send |
|
73 | msg : message to send | |
73 |
|
74 | |||
74 | This is threadsafe, as it uses IOLoop.add_callback to give the loop's |
|
75 | This is threadsafe, as it uses IOLoop.add_callback to give the loop's | |
75 | thread control of the action. |
|
76 | thread control of the action. | |
76 | """ |
|
77 | """ | |
77 | def thread_send(): |
|
78 | def thread_send(): | |
78 | self.session.send(self.stream, msg) |
|
79 | self.session.send(self.stream, msg) | |
79 | self.ioloop.add_callback(thread_send) |
|
80 | self.ioloop.add_callback(thread_send) | |
80 |
|
81 | |||
81 | def _handle_recv(self, msg): |
|
82 | def _handle_recv(self, msg): | |
82 | """Callback for stream.on_recv. |
|
83 | """Callback for stream.on_recv. | |
83 |
|
84 | |||
84 | Unpacks message, and calls handlers with it. |
|
85 | Unpacks message, and calls handlers with it. | |
85 | """ |
|
86 | """ | |
86 | ident,smsg = self.session.feed_identities(msg) |
|
87 | ident,smsg = self.session.feed_identities(msg) | |
87 | msg = self.session.deserialize(smsg) |
|
88 | msg = self.session.deserialize(smsg) | |
88 | # let client inspect messages |
|
89 | # let client inspect messages | |
89 | if self._inspect: |
|
90 | if self._inspect: | |
90 | self._inspect(msg) |
|
91 | self._inspect(msg) | |
91 | self.call_handlers(msg) |
|
92 | self.call_handlers(msg) | |
92 |
|
93 | |||
93 | def call_handlers(self, msg): |
|
94 | def call_handlers(self, msg): | |
94 | """This method is called in the ioloop thread when a message arrives. |
|
95 | """This method is called in the ioloop thread when a message arrives. | |
95 |
|
96 | |||
96 | Subclasses should override this method to handle incoming messages. |
|
97 | Subclasses should override this method to handle incoming messages. | |
97 | It is important to remember that this method is called in the thread |
|
98 | It is important to remember that this method is called in the thread | |
98 | so that some logic must be done to ensure that the application level |
|
99 | so that some logic must be done to ensure that the application level | |
99 | handlers are called in the application thread. |
|
100 | handlers are called in the application thread. | |
100 | """ |
|
101 | """ | |
101 | pass |
|
102 | pass | |
102 |
|
103 | |||
103 | def process_events(self): |
|
104 | def process_events(self): | |
104 | """Subclasses should override this with a method |
|
105 | """Subclasses should override this with a method | |
105 | processing any pending GUI events. |
|
106 | processing any pending GUI events. | |
106 | """ |
|
107 | """ | |
107 | pass |
|
108 | pass | |
108 |
|
109 | |||
109 |
|
110 | |||
110 | def flush(self, timeout=1.0): |
|
111 | def flush(self, timeout=1.0): | |
111 | """Immediately processes all pending messages on this channel. |
|
112 | """Immediately processes all pending messages on this channel. | |
112 |
|
113 | |||
113 | This is only used for the IOPub channel. |
|
114 | This is only used for the IOPub channel. | |
114 |
|
115 | |||
115 | Callers should use this method to ensure that :meth:`call_handlers` |
|
116 | Callers should use this method to ensure that :meth:`call_handlers` | |
116 | has been called for all messages that have been received on the |
|
117 | has been called for all messages that have been received on the | |
117 | 0MQ SUB socket of this channel. |
|
118 | 0MQ SUB socket of this channel. | |
118 |
|
119 | |||
119 | This method is thread safe. |
|
120 | This method is thread safe. | |
120 |
|
121 | |||
121 | Parameters |
|
122 | Parameters | |
122 | ---------- |
|
123 | ---------- | |
123 | timeout : float, optional |
|
124 | timeout : float, optional | |
124 | The maximum amount of time to spend flushing, in seconds. The |
|
125 | The maximum amount of time to spend flushing, in seconds. The | |
125 | default is one second. |
|
126 | default is one second. | |
126 | """ |
|
127 | """ | |
127 | # We do the IOLoop callback process twice to ensure that the IOLoop |
|
128 | # We do the IOLoop callback process twice to ensure that the IOLoop | |
128 | # gets to perform at least one full poll. |
|
129 | # gets to perform at least one full poll. | |
129 | stop_time = time.time() + timeout |
|
130 | stop_time = time.time() + timeout | |
130 | for i in range(2): |
|
131 | for i in range(2): | |
131 | self._flushed = False |
|
132 | self._flushed = False | |
132 | self.ioloop.add_callback(self._flush) |
|
133 | self.ioloop.add_callback(self._flush) | |
133 | while not self._flushed and time.time() < stop_time: |
|
134 | while not self._flushed and time.time() < stop_time: | |
134 | time.sleep(0.01) |
|
135 | time.sleep(0.01) | |
135 |
|
136 | |||
136 | def _flush(self): |
|
137 | def _flush(self): | |
137 | """Callback for :method:`self.flush`.""" |
|
138 | """Callback for :method:`self.flush`.""" | |
138 | self.stream.flush() |
|
139 | self.stream.flush() | |
139 | self._flushed = True |
|
140 | self._flushed = True | |
140 |
|
141 | |||
141 |
|
142 | |||
142 | class IOLoopThread(Thread): |
|
143 | class IOLoopThread(Thread): | |
143 | """Run a pyzmq ioloop in a thread to send and receive messages |
|
144 | """Run a pyzmq ioloop in a thread to send and receive messages | |
144 | """ |
|
145 | """ | |
145 | def __init__(self, loop): |
|
146 | def __init__(self, loop): | |
146 | super(IOLoopThread, self).__init__() |
|
147 | super(IOLoopThread, self).__init__() | |
147 | self.daemon = True |
|
148 | self.daemon = True | |
148 | atexit.register(self._notice_exit) |
|
149 | atexit.register(self._notice_exit) | |
149 | self.ioloop = loop or ioloop.IOLoop() |
|
150 | self.ioloop = loop or ioloop.IOLoop() | |
150 |
|
151 | |||
151 | def _notice_exit(self): |
|
152 | def _notice_exit(self): | |
152 | self._exiting = True |
|
153 | self._exiting = True | |
153 |
|
154 | |||
154 | def run(self): |
|
155 | def run(self): | |
155 | """Run my loop, ignoring EINTR events in the poller""" |
|
156 | """Run my loop, ignoring EINTR events in the poller""" | |
156 | while True: |
|
157 | while True: | |
157 | try: |
|
158 | try: | |
158 | self.ioloop.start() |
|
159 | self.ioloop.start() | |
159 | except ZMQError as e: |
|
160 | except ZMQError as e: | |
160 | if e.errno == errno.EINTR: |
|
161 | if e.errno == errno.EINTR: | |
161 | continue |
|
162 | continue | |
162 | else: |
|
163 | else: | |
163 | raise |
|
164 | raise | |
164 | except Exception: |
|
165 | except Exception: | |
165 | if self._exiting: |
|
166 | if self._exiting: | |
166 | break |
|
167 | break | |
167 | else: |
|
168 | else: | |
168 | raise |
|
169 | raise | |
169 | else: |
|
170 | else: | |
170 | break |
|
171 | break | |
171 |
|
172 | |||
172 | def stop(self): |
|
173 | def stop(self): | |
173 | """Stop the channel's event loop and join its thread. |
|
174 | """Stop the channel's event loop and join its thread. | |
174 |
|
175 | |||
175 | This calls :meth:`~threading.Thread.join` and returns when the thread |
|
176 | This calls :meth:`~threading.Thread.join` and returns when the thread | |
176 | terminates. :class:`RuntimeError` will be raised if |
|
177 | terminates. :class:`RuntimeError` will be raised if | |
177 | :meth:`~threading.Thread.start` is called again. |
|
178 | :meth:`~threading.Thread.start` is called again. | |
178 | """ |
|
179 | """ | |
179 | if self.ioloop is not None: |
|
180 | if self.ioloop is not None: | |
180 | self.ioloop.stop() |
|
181 | self.ioloop.stop() | |
181 | self.join() |
|
182 | self.join() | |
182 | self.close() |
|
183 | self.close() | |
183 |
|
184 | |||
184 | def close(self): |
|
185 | def close(self): | |
185 | if self.ioloop is not None: |
|
186 | if self.ioloop is not None: | |
186 | try: |
|
187 | try: | |
187 | self.ioloop.close(all_fds=True) |
|
188 | self.ioloop.close(all_fds=True) | |
188 | except Exception: |
|
189 | except Exception: | |
189 | pass |
|
190 | pass | |
190 |
|
191 | |||
191 |
|
192 | |||
192 | class ThreadedKernelClient(KernelClient): |
|
193 | class ThreadedKernelClient(KernelClient): | |
193 | """ A KernelClient that provides thread-safe sockets with async callbacks on message replies. |
|
194 | """ A KernelClient that provides thread-safe sockets with async callbacks on message replies. | |
194 | """ |
|
195 | """ | |
195 |
|
196 | |||
196 | _ioloop = None |
|
197 | _ioloop = None | |
197 | @property |
|
198 | @property | |
198 | def ioloop(self): |
|
199 | def ioloop(self): | |
199 | if self._ioloop is None: |
|
200 | if self._ioloop is None: | |
200 | self._ioloop = ioloop.IOLoop() |
|
201 | self._ioloop = ioloop.IOLoop() | |
201 | return self._ioloop |
|
202 | return self._ioloop | |
202 |
|
203 | |||
203 | ioloop_thread = Instance(IOLoopThread) |
|
204 | ioloop_thread = Instance(IOLoopThread) | |
204 |
|
205 | |||
205 | def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): |
|
206 | def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): | |
206 | if shell: |
|
207 | if shell: | |
207 | self.shell_channel._inspect = self._check_kernel_info_reply |
|
208 | self.shell_channel._inspect = self._check_kernel_info_reply | |
208 |
|
209 | |||
209 | self.ioloop_thread = IOLoopThread(self.ioloop) |
|
210 | self.ioloop_thread = IOLoopThread(self.ioloop) | |
210 | self.ioloop_thread.start() |
|
211 | self.ioloop_thread.start() | |
211 |
|
212 | |||
212 | super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb) |
|
213 | super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb) | |
213 |
|
214 | |||
214 | def _check_kernel_info_reply(self, msg): |
|
215 | def _check_kernel_info_reply(self, msg): | |
215 | """This is run in the ioloop thread when the kernel info reply is recieved |
|
216 | """This is run in the ioloop thread when the kernel info reply is recieved | |
216 | """ |
|
217 | """ | |
217 | if msg['msg_type'] == 'kernel_info_reply': |
|
218 | if msg['msg_type'] == 'kernel_info_reply': | |
218 | self._handle_kernel_info_reply(msg) |
|
219 | self._handle_kernel_info_reply(msg) | |
219 | self.shell_channel._inspect = None |
|
220 | self.shell_channel._inspect = None | |
220 |
|
221 | |||
221 | def stop_channels(self): |
|
222 | def stop_channels(self): | |
222 | super(ThreadedKernelClient, self).stop_channels() |
|
223 | super(ThreadedKernelClient, self).stop_channels() | |
223 | if self.ioloop_thread.is_alive(): |
|
224 | if self.ioloop_thread.is_alive(): | |
224 | self.ioloop_thread.stop() |
|
225 | self.ioloop_thread.stop() | |
225 |
|
226 | |||
226 | iopub_channel_class = Type(ThreadedZMQSocketChannel) |
|
227 | iopub_channel_class = Type(ThreadedZMQSocketChannel) | |
227 | shell_channel_class = Type(ThreadedZMQSocketChannel) |
|
228 | shell_channel_class = Type(ThreadedZMQSocketChannel) | |
228 | stdin_channel_class = Type(ThreadedZMQSocketChannel) |
|
229 | stdin_channel_class = Type(ThreadedZMQSocketChannel) | |
229 | hb_channel_class = Type(HBChannel) |
|
230 | hb_channel_class = Type(HBChannel) |
General Comments 0
You need to be logged in to leave comments.
Login now