Show More
@@ -3,6 +3,7 b'' | |||
|
3 | 3 | |
|
4 | 4 | # System library imports. |
|
5 | 5 | from PyQt4 import QtCore |
|
6 | import zmq | |
|
6 | 7 | |
|
7 | 8 | # IPython imports. |
|
8 | 9 | from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \ |
@@ -10,6 +11,7 b' from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \\' | |||
|
10 | 11 | from util import MetaQObjectHasTraits |
|
11 | 12 | |
|
12 | 13 | |
|
14 | ||
|
13 | 15 | class QtSubSocketChannel(SubSocketChannel, QtCore.QObject): |
|
14 | 16 | |
|
15 | 17 | # Emitted when any message is received. |
@@ -95,6 +97,7 b' class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):' | |||
|
95 | 97 | """ Reimplemented to skip callback handling. |
|
96 | 98 | """ |
|
97 | 99 | self.command_queue.put(msg) |
|
100 | self.add_io_state(zmq.POLLOUT) | |
|
98 | 101 | |
|
99 | 102 | |
|
100 | 103 | class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): |
@@ -34,6 +34,12 b' class ZmqSocketChannel(Thread):' | |||
|
34 | 34 | """ The base class for the channels that use ZMQ sockets. |
|
35 | 35 | """ |
|
36 | 36 | |
|
37 | context = None | |
|
38 | session = None | |
|
39 | socket = None | |
|
40 | ioloop = None | |
|
41 | iostate = None | |
|
42 | ||
|
37 | 43 | def __init__(self, context, session, address=None): |
|
38 | 44 | super(ZmqSocketChannel, self).__init__() |
|
39 | 45 | self.daemon = True |
@@ -41,7 +47,6 b' class ZmqSocketChannel(Thread):' | |||
|
41 | 47 | self.context = context |
|
42 | 48 | self.session = session |
|
43 | 49 | self.address = address |
|
44 | self.socket = None | |
|
45 | 50 | |
|
46 | 51 | def stop(self): |
|
47 | 52 | """Stop the thread's activity. Returns when the thread terminates. |
@@ -73,6 +78,28 b' class ZmqSocketChannel(Thread):' | |||
|
73 | 78 | |
|
74 | 79 | address = property(get_address, set_adresss) |
|
75 | 80 | |
|
81 | def add_io_state(self, state): | |
|
82 | """Add IO state to the eventloop. | |
|
83 | ||
|
84 | This is thread safe as it uses the thread safe IOLoop.add_callback. | |
|
85 | """ | |
|
86 | def add_io_state_callback(): | |
|
87 | if not self.iostate & state: | |
|
88 | self.iostate = self.iostate | state | |
|
89 | self.ioloop.update_handler(self.socket, self.iostate) | |
|
90 | self.ioloop.add_callback(add_io_state_callback) | |
|
91 | ||
|
92 | def drop_io_state(self, state): | |
|
93 | """Drop IO state from the eventloop. | |
|
94 | ||
|
95 | This is thread safe as it uses the thread safe IOLoop.add_callback. | |
|
96 | """ | |
|
97 | def drop_io_state_callback(): | |
|
98 | if self.iostate & state: | |
|
99 | self.iostate = self.iostate & (~state) | |
|
100 | self.ioloop.update_handler(self.socket, self.iostate) | |
|
101 | self.ioloop.add_callback(drop_io_state_callback) | |
|
102 | ||
|
76 | 103 | |
|
77 | 104 | class SubSocketChannel(ZmqSocketChannel): |
|
78 | 105 | |
@@ -85,8 +112,9 b' class SubSocketChannel(ZmqSocketChannel):' | |||
|
85 | 112 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
86 | 113 | self.socket.connect('tcp://%s:%i' % self.address) |
|
87 | 114 | self.ioloop = ioloop.IOLoop() |
|
115 | self.iostate = POLLIN|POLLERR | |
|
88 | 116 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
89 |
|
|
|
117 | self.iostate) | |
|
90 | 118 | self.ioloop.start() |
|
91 | 119 | |
|
92 | 120 | def stop(self): |
@@ -171,8 +199,9 b' class XReqSocketChannel(ZmqSocketChannel):' | |||
|
171 | 199 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
172 | 200 | self.socket.connect('tcp://%s:%i' % self.address) |
|
173 | 201 | self.ioloop = ioloop.IOLoop() |
|
202 | self.iostate = POLLERR|POLLIN | |
|
174 | 203 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
175 |
|
|
|
204 | self.iostate) | |
|
176 | 205 | self.ioloop.start() |
|
177 | 206 | |
|
178 | 207 | def stop(self): |
@@ -180,7 +209,6 b' class XReqSocketChannel(ZmqSocketChannel):' | |||
|
180 | 209 | super(XReqSocketChannel, self).stop() |
|
181 | 210 | |
|
182 | 211 | def _handle_events(self, socket, events): |
|
183 | # Turn on and off POLLOUT depending on if we have made a request | |
|
184 | 212 | if events & POLLERR: |
|
185 | 213 | self._handle_err() |
|
186 | 214 | if events & POLLOUT: |
@@ -199,6 +227,8 b' class XReqSocketChannel(ZmqSocketChannel):' | |||
|
199 | 227 | pass |
|
200 | 228 | else: |
|
201 | 229 | self.socket.send_json(msg) |
|
230 | if self.command_queue.empty(): | |
|
231 | self.drop_io_state(POLLOUT) | |
|
202 | 232 | |
|
203 | 233 | def _handle_err(self): |
|
204 | 234 | # We don't want to let this go silently, so eventually we should log. |
@@ -208,6 +238,7 b' class XReqSocketChannel(ZmqSocketChannel):' | |||
|
208 | 238 | handler = self._find_handler(msg['msg_type'], callback) |
|
209 | 239 | self.handler_queue.put(handler) |
|
210 | 240 | self.command_queue.put(msg) |
|
241 | self.add_io_state(POLLOUT) | |
|
211 | 242 | |
|
212 | 243 | def execute(self, code, callback=None): |
|
213 | 244 | # Create class for content/msg creation. Related to, but possibly |
General Comments 0
You need to be logged in to leave comments.
Login now